Biti razvijalec knjižnice (kriptografske primitive GOST v čistem Pythonu), pogosto prejmem vprašanja o tem, kako implementirati najpreprostejše varno sporočanje na koleno. Mnogi ljudje menijo, da je uporabna kriptografija precej preprosta in da bo klic .encrypt() na blokovni šifri dovolj za varno pošiljanje prek komunikacijskega kanala. Drugi verjamejo, da je uporabna kriptografija usoda redkih in da je sprejemljivo, da bogata podjetja, kot je Telegram z olimpijadami-matematiki varen protokol.
Vse to me je spodbudilo, da sem napisal ta članek, da pokažem, da implementacija kriptografskih protokolov in varnega IM ni tako težka naloga. Vendar pa ni vredno izumljati lastnih protokolov za preverjanje pristnosti in dogovora o ključih.

Članek bo napisal , , instant messenger z avtentikacijo in protokol dogovora o ključu (na podlagi katerega se izvaja ), z uporabo izključno GOST kriptografskih algoritmov knjižnice PyGOST in knjižnice za kodiranje sporočil ASN.1 (o čemer sem že ). Pogoj: biti mora tako preprost, da ga je mogoče napisati iz nič v enem večeru (ali delovnem dnevu), sicer ni več enostaven program. Verjetno ima napake, nepotrebne zaplete, pomanjkljivosti, poleg tega je to moj prvi program, ki uporablja knjižnico asyncio.
Oblikovanje IM
Najprej moramo razumeti, kako bo izgledal naš IM. Zaradi poenostavitve naj bo to omrežje enakovrednih, brez kakršnega koli odkrivanja udeležencev. Osebno bomo navedli, na kateri naslov: vrata se povezati za komunikacijo s sogovornikom.
Razumem, da je v tem trenutku predpostavka, da je na voljo neposredna komunikacija med dvema poljubnima računalnikoma, pomembna omejitev za uporabnost IM v praksi. Toda bolj kot bodo razvijalci izvajali vse vrste bergel za prečkanje NAT, dlje bomo ostali na internetu IPv4, z depresivno verjetnostjo komunikacije med poljubnimi računalniki. Kako dolgo lahko tolerirate pomanjkanje IPv6 doma in v službi?
Imeli bomo mrežo prijatelja do prijatelja: vsi možni sogovorniki morajo biti znani vnaprej. Prvič, to močno poenostavi vse: predstavili smo se, našli ali ne našli imena/ključa, prekinili povezavo ali nadaljevali z delom, poznali sogovornika. Drugič, na splošno je varen in odpravlja številne napade.
IM vmesnik bo blizu klasičnim rešitvam , ki so mi zelo všeč zaradi njihovega minimalizma in Unixove filozofije. Program IM za vsakega sogovornika ustvari imenik s tremi domenskimi vtičnicami Unix:
- v—v njej se zabeležijo sporočila, poslana sogovorniku;
- ven - iz njega se berejo sporočila, prejeta od sogovornika;
- stanje - z branjem iz njega izvemo, ali je sogovornik trenutno povezan, naslov/vrata povezave.
Poleg tega se ustvari conn socket, tako da zapišemo gostiteljska vrata, v katera sprožimo povezavo z oddaljenim sogovornikom.
|-- alice
| |-- in
| |-- out
| `-- state
|-- bob
| |-- in
| |-- out
| `-- state
`- conn
Ta pristop vam omogoča neodvisno implementacijo IM transporta in uporabniškega vmesnika, ker ni prijatelja, ne morete ugoditi vsem. Uporaba in / ali , lahko dobite večokenski vmesnik s označevanjem sintakse. In s pomočjo lahko dobite vrstico za vnos sporočila, združljivo z GNU Readline.
Pravzaprav nesramni projekti uporabljajo datoteke FIFO. Osebno nisem mogel razumeti, kako delati z datotekami konkurenčno v asyncio brez ročno napisanega ozadja iz namenskih niti (za take stvari že dolgo uporabljam jezik ). Zato sem se odločil zadovoljiti z domenskimi vtičnicami Unix. Na žalost to onemogoča izvajanje echo 2001:470:dead::babe 6666 > conn. To težavo sem rešil z uporabo : echo 2001:470:mrtev::babe 6666 | socat - UNIX-CONNECT:conn, socat READLINE UNIX-CONNECT:alice/in.
Izvirni nevaren protokol
TCP se uporablja kot transport: zagotavlja dostavo in naročilo. UDP ne zagotavlja ne enega ne drugega (kar bi bilo uporabno pri uporabi kriptografije), temveč podporo Python ni na voljo takoj.
Na žalost v TCP ni koncepta sporočila, le tok bajtov. Zato je treba pripraviti obliko sporočil, da jih je mogoče deliti med seboj v tej temi. Lahko se dogovorimo za uporabo znaka za premik vrstice. Za začetek je v redu, a ko začnemo šifrirati svoja sporočila, se lahko ta znak pojavi kjer koli v šifriranem besedilu. V omrežjih so zato priljubljeni protokoli, ki najprej pošljejo dolžino sporočila v bajtih. Python ima na primer xdrlib, ki vam omogoča delo s podobnim formatom .
Z branjem TCP ne bomo delovali pravilno in učinkovito - kodo bomo poenostavili. Podatke iz vtičnice beremo v neskončni zanki, dokler ne dekodiramo celotnega sporočila. JSON z XML se lahko uporablja tudi kot format za ta pristop. Toda ko je dodana kriptografija, bodo morali biti podatki podpisani in overjeni - to pa bo zahtevalo bajt za bajtom identično predstavitev objektov, ki je JSON/XML ne zagotavlja (rezultati izpisov se lahko razlikujejo).
XDR je primeren za to nalogo, vendar sem izbral ASN.1 s kodiranjem DER in knjižnico, saj bomo imeli pri roki visokokakovostne predmete, s katerimi je pogosto bolj prijetno in priročno delati. Za razliko od brez sheme , ali , bo ASN.1 samodejno preveril podatke glede na trdo kodirano shemo.
# Msg ::= CHOICE {
# text MsgText,
# handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
schema = ((
("text", MsgText()),
("handshake", MsgHandshake(expl=tag_ctxc(0))),
))
# MsgText ::= SEQUENCE {
# text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
schema = ((
("text", UTF8String(bounds=(1, MaxTextLen))),
))
# MsgHandshake ::= SEQUENCE {
# peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
schema = ((
("peerName", UTF8String(bounds=(1, 256))),
))
Prejeto sporočilo bo Msg: besedilo MsgText (zaenkrat z enim tekstovnim poljem) ali sporočilo rokovanja MsgHandshake (ki vsebuje ime sogovornika). Zdaj je videti preveč zapleteno, vendar je to temelj za prihodnost.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │MsgHandshake( IdA) │ │────────── ────────>│ │ │ │MsgHandshake(IdB) │ │<──────────────────│ │ │ │ MsgText() │ │───── MsgText() │ │ │
IM brez kriptografije
Kot sem že rekel, bo knjižnica asyncio uporabljena za vse operacije vtičnic. Naj objavimo, kaj pričakujemo ob lansiranju:
parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
"--our-name",
required=True,
help="Our peer name",
)
parser.add_argument(
"--their-names",
required=True,
help="Their peer names, comma-separated",
)
parser.add_argument(
"--bind",
default="::1",
help="Address to listen on",
)
parser.add_argument(
"--port",
type=int,
default=6666,
help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))
Nastavite svoje ime (--naše-ime alice). Vsi pričakovani sogovorniki so navedeni ločeno z vejicami (—njihova-imena bob,eve). Za vsakega od sogovornikov je ustvarjen imenik z vtičnicami Unix in korrutina za vsako in, out, stanje:
for peer_name in THEIR_NAMES:
makedirs(peer_name, mode=0o700, exist_ok=True)
out_queue = asyncio.Queue()
OUT_QUEUES[peer_name] = out_queue
asyncio.ensure_future(asyncio.start_unix_server(
partial(unixsock_out_processor, out_queue=out_queue),
path.join(peer_name, "out"),
))
in_queue = asyncio.Queue()
IN_QUEUES[peer_name] = in_queue
asyncio.ensure_future(asyncio.start_unix_server(
partial(unixsock_in_processor, in_queue=in_queue),
path.join(peer_name, "in"),
))
asyncio.ensure_future(asyncio.start_unix_server(
partial(unixsock_state_processor, peer_name=peer_name),
path.join(peer_name, "state"),
))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))
Sporočila, ki prihajajo od uporabnika iz vtičnice, so poslana v čakalno vrsto IN_QUEUES:
async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
while True:
text = await reader.read(MaxTextLen)
if text == b"":
break
await in_queue.put(text.decode("utf-8"))
Sporočila, ki prihajajo od sogovornikov, se pošljejo v OUT_QUEUES čakalne vrste, iz katerih se podatki zapišejo v izhodno vtičnico:
async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
while True:
text = await out_queue.get()
writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
await writer.drain()
Pri branju iz državne vtičnice program išče naslov sogovornika v slovarju PEER_ALIVE. Če povezave s sogovornikom še ni, se napiše prazna vrstica.
async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
peer_writer = PEER_ALIVES.get(peer_name)
writer.write(
b"" if peer_writer is None else (" ".join([
str(i) for i in peer_writer.get_extra_info("peername")[:2]
]).encode("utf-8") + b"n")
)
await writer.drain()
writer.close()
Pri pisanju naslova v priključno vtičnico se zažene funkcija »iniciator« povezave:
async def unixsock_conn_processor(reader, writer) -> None:
data = await reader.read(256)
writer.close()
host, port = data.decode("utf-8").split(" ")
await initiator(host=host, port=int(port))
Razmislimo o pobudniku. Najprej očitno odpre povezavo do podanega gostitelja/vrata in pošlje rokovalno sporočilo s svojim imenom:
130 async def initiator(host, port):
131 _id = repr((host, port))
132 logging.info("%s: dialing", _id)
133 reader, writer = await asyncio.open_connection(host, port)
134 # Handshake message {{{
135 writer.write(Msg(("handshake", MsgHandshake((
136 ("peerName", OUR_NAME),
137 )))).encode())
138 # }}}
139 await writer.drain()
Nato počaka na odgovor oddaljene stranke. Poskuša dekodirati dohodni odgovor s shemo Msg ASN.1. Predvidevamo, da bo celotno sporočilo poslano v enem segmentu TCP in ga bomo prejeli atomsko ob klicu .read(). Preverimo, ali smo prejeli sporočilo rokovanja.
141 # Wait for Handshake message {{{
142 data = await reader.read(256)
143 if data == b"":
144 logging.warning("%s: no answer, disconnecting", _id)
145 writer.close()
146 return
147 try:
148 msg, _ = Msg().decode(data)
149 except ASN1Error:
150 logging.warning("%s: undecodable answer, disconnecting", _id)
151 writer.close()
152 return
153 logging.info("%s: got %s message", _id, msg.choice)
154 if msg.choice != "handshake":
155 logging.warning("%s: unexpected message, disconnecting", _id)
156 writer.close()
157 return
158 # }}}
Preverimo, ali nam je prejeto ime sogovornika znano. Če ne, potem prekinemo povezavo. Preverimo, ali smo z njim že vzpostavili povezavo (sogovornik je ponovno dal ukaz za povezavo z nami) in jo prekinemo. Čakalna vrsta IN_QUEUES vsebuje nize Python z besedilom sporočila, vendar ima posebno vrednost None, ki signalizira koprogramu msg_sender, naj preneha delovati, tako da pozabi na zapisovalca, povezanega s podedovano povezavo TCP.
159 msg_handshake = msg.value
160 peer_name = str(msg_handshake["peerName"])
161 if peer_name not in THEIR_NAMES:
162 logging.warning("unknown peer name: %s", peer_name)
163 writer.close()
164 return
165 logging.info("%s: session established: %s", _id, peer_name)
166 # Run text message sender, initialize transport decoder {{{
167 peer_alive = PEER_ALIVES.pop(peer_name, None)
168 if peer_alive is not None:
169 peer_alive.close()
170 await IN_QUEUES[peer_name].put(None)
171 PEER_ALIVES[peer_name] = writer
172 asyncio.ensure_future(msg_sender(peer_name, writer))
173 # }}}
msg_sender sprejme odhodna sporočila (v čakalni vrsti iz vtičnice), jih serializira v sporočilo MsgText in jih pošlje prek povezave TCP. Vsak trenutek se lahko zlomi - to jasno prestrežemo.
async def msg_sender(peer_name: str, writer) -> None:
in_queue = IN_QUEUES[peer_name]
while True:
text = await in_queue.get()
if text is None:
break
writer.write(Msg(("text", MsgText((
("text", UTF8String(text)),
)))).encode())
try:
await writer.drain()
except ConnectionResetError:
del PEER_ALIVES[peer_name]
return
logging.info("%s: sent %d characters message", peer_name, len(text))
Na koncu iniciator vstopi v neskončno zanko branja sporočil iz vtičnice. Preveri, ali so ta sporočila besedilna sporočila, in jih postavi v čakalno vrsto OUT_QUEUES, iz katere bodo poslana v izhodno vtičnico ustreznega sogovornika. Zakaj preprosto ne naredite .read() in dekodirate sporočila? Ker je možno, da bo več sporočil od uporabnika združenih v medpomnilniku operacijskega sistema in poslanih v enem segmentu TCP. Prvega lahko dekodiramo, nato pa lahko del naslednjega ostane v medpomnilniku. V primeru kakršnih koli neobičajnih situacij zapremo povezavo TCP in ustavimo soprogram msg_sender (s pošiljanjem None v čakalno vrsto OUT_QUEUES).
174 buf = b""
175 # Wait for test messages {{{
176 while True:
177 data = await reader.read(MaxMsgLen)
178 if data == b"":
179 break
180 buf += data
181 if len(buf) > MaxMsgLen:
182 logging.warning("%s: max buffer size exceeded", _id)
183 break
184 try:
185 msg, tail = Msg().decode(buf)
186 except ASN1Error:
187 continue
188 buf = tail
189 if msg.choice != "text":
190 logging.warning("%s: unexpected %s message", _id, msg.choice)
191 break
192 try:
193 await msg_receiver(msg.value, peer_name)
194 except ValueError as err:
195 logging.warning("%s: %s", err)
196 break
197 # }}}
198 logging.info("%s: disconnecting: %s", _id, peer_name)
199 IN_QUEUES[peer_name].put(None)
200 writer.close()
66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
67 text = str(msg_text["text"])
68 logging.info("%s: received %d characters message", peer_name, len(text))
69 await OUT_QUEUES[peer_name].put(text)
Vrnimo se k glavni kodi. Po izdelavi vseh soprogramov ob zagonu programa zaženemo strežnik TCP. Za vsako vzpostavljeno povezavo ustvari odzivni koprogram.
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()
responder je podoben iniciatorju in zrcali vsa ista dejanja, vendar se zaradi poenostavitve neskončna zanka branja sporočil začne takoj. Trenutno protokol rokovanja pošilja po eno sporočilo z vsake strani, v prihodnosti pa bosta dve od iniciatorja povezave, nato pa bo besedilna sporočila mogoče poslati takoj.
72 async def responder(reader, writer):
73 _id = writer.get_extra_info("peername")
74 logging.info("%s: connected", _id)
75 buf = b""
76 msg_expected = "handshake"
77 peer_name = None
78 while True:
79 # Read until we get Msg message {{{
80 data = await reader.read(MaxMsgLen)
81 if data == b"":
82 logging.info("%s: closed connection", _id)
83 break
84 buf += data
85 if len(buf) > MaxMsgLen:
86 logging.warning("%s: max buffer size exceeded", _id)
87 break
88 try:
89 msg, tail = Msg().decode(buf)
90 except ASN1Error:
91 continue
92 buf = tail
93 # }}}
94 if msg.choice != msg_expected:
95 logging.warning("%s: unexpected %s message", _id, msg.choice)
96 break
97 if msg_expected == "text":
98 try:
99 await msg_receiver(msg.value, peer_name)
100 except ValueError as err:
101 logging.warning("%s: %s", err)
102 break
103 # Process Handshake message {{{
104 elif msg_expected == "handshake":
105 logging.info("%s: got %s message", _id, msg_expected)
106 msg_handshake = msg.value
107 peer_name = str(msg_handshake["peerName"])
108 if peer_name not in THEIR_NAMES:
109 logging.warning("unknown peer name: %s", peer_name)
110 break
111 writer.write(Msg(("handshake", MsgHandshake((
112 ("peerName", OUR_NAME),
113 )))).encode())
114 await writer.drain()
115 logging.info("%s: session established: %s", _id, peer_name)
116 peer_alive = PEER_ALIVES.pop(peer_name, None)
117 if peer_alive is not None:
118 peer_alive.close()
119 await IN_QUEUES[peer_name].put(None)
120 PEER_ALIVES[peer_name] = writer
121 asyncio.ensure_future(msg_sender(peer_name, writer))
122 msg_expected = "text"
123 # }}}
124 logging.info("%s: disconnecting", _id)
125 if msg_expected == "text":
126 IN_QUEUES[peer_name].put(None)
127 writer.close()
Varen protokol
Čas je, da zavarujemo naše komunikacije. Kaj mislimo z varnostjo in kaj želimo:
- zaupnost prenesenih sporočil;
- pristnost in celovitost poslanih sporočil - njihove spremembe morajo biti zaznane;
- zaščita pred napadi ponavljanja - zaznati je treba dejstvo manjkajočih ali ponovljenih sporočil (in se odločimo za prekinitev povezave);
- identifikacija in avtentikacija sogovornikov z vnaprej vnesenimi javnimi ključi - že prej smo se odločili, da delamo omrežje prijatelj-prijatelj. Šele po avtentikaciji bomo razumeli, s kom komuniciramo;
- razpoložljivost lastnosti (PFS) - ogrožanje našega dolgotrajnega ključa za podpisovanje ne bi smelo povzročiti zmožnosti branja vse prejšnje korespondence. Snemanje prestreženega prometa postane neuporabno;
- veljavnost/veljavnost sporočil (transport in rokovanje) le znotraj ene TCP seje. Vstavljanje pravilno podpisanih/preverjenih sporočil iz druge seje (tudi z istim sogovornikom) ne bi smelo biti mogoče;
- pasivni opazovalec ne bi smel videti identifikatorjev uporabnikov, prenesenih dolgoživih javnih ključev ali njihovih zgoščenih vrednosti. Določena anonimnost pasivnega opazovalca.
Presenetljivo je, da skoraj vsi želijo imeti ta minimum v katerem koli protokolu rokovanja in zelo malo od zgoraj navedenega je na koncu izpolnjeno za "domače" protokole. Zdaj ne bomo izumili ničesar novega. Vsekakor priporočam uporabo za gradnjo protokolov, vendar izberimo nekaj preprostejšega.
Dva najbolj priljubljena protokola sta:
- - zelo kompleksen protokol z dolgo zgodovino hroščev, zastojov, ranljivosti, slabega razmišljanja, kompleksnosti in pomanjkljivosti (vendar to nima veliko skupnega s TLS 1.3). Vendar tega ne upoštevamo, ker je preveč zapleteno.
- с — nimajo resnih kriptografskih težav, čeprav tudi niso preproste. Če berete o IKEv1 in IKEv2, potem je njun vir , protokoli ISO/IEC IS 9798-3 in SIGMA (SIGn-and-MAc) – dovolj preprosti za izvedbo v enem večeru.
Kaj je dobrega pri SIGMI, kot zadnjem členu v razvoju STS/ISO protokolov? Izpolnjuje vse naše zahteve (vključno s »skrivanjem« identifikatorjev sogovornika) in nima znanih kriptografskih težav. Je minimalističen - odstranitev vsaj enega elementa iz sporočila protokola bo povzročila njegovo nevarnost.
Pojdimo od najpreprostejšega domačega protokola k SIGMI. Najosnovnejša operacija, ki nas zanima, je : funkcija, ki obema udeležencema izpiše isto vrednost, ki se lahko uporabi kot simetrični ključ. Ne da bi se spuščali v podrobnosti: vsaka od strani ustvari efemerni (uporablja se samo znotraj ene seje) par ključev (javni in zasebni ključ), izmenja javne ključe, pokliče funkcijo sporazuma, na vhod katere posreduje svoj zasebni ključ in javni ključ. ključ sogovornika.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ══════════╗ │───────────────>│ ║PrvA, PubA = DHgen()║ │ │ ╚═ ════════ ═══════════╝ │ IdB, PubB │ ╔════════════════════╗ │<─────────── ──────│ ║PrvB, PubB = DHgen()║ │ │ ╚════════════════════╝ ─ ───┐ ╔═ ═══ ═══╧════════════╗ │ ║Ključ = DH(PrvA, PubB)║ <───┘ ╚═══════╤═ ═ ══════ ════╝ │ │ │ │
Vsak lahko skoči v sredino in javne ključe zamenja s svojimi – v tem protokolu ni avtentikacije sogovornikov. Dodajmo podpis z dolgoživimi ključi.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │IdA, PubA, znak(SignPrvA, (PubA)) │ ╔═ │───────────── ─────────── ───────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════ ═══════ ═════════════╝ │IdB, PubBB , znak(SignPrvB, (PubB)) │ ╔══════════════ ═══════ ══════╗ │<─────────────────────────── ───────────── ──│ ║SignPrvB, SignPubB = load( )║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═══════════ ══════════════ ══╝ ────┐ ╔ ══════════════════ ═══╗ │ │ ║preveri( SignPubB, ...)║ │ <───┘ ║Key = DH(Pr vA, PubB) ║ │ │ ╚═════════════════════ ╝ │ │ │
Takšen podpis ne bo deloval, ker ni vezan na določeno sejo. Takšna sporočila so "primerna" tudi za seje z drugimi udeleženci. Celoten kontekst se mora naročiti. To nas prisili, da dodamo še eno sporočilo od A.
Poleg tega je ključnega pomena, da pod podpis dodate svoj identifikator, saj lahko sicer zamenjamo IdXXX in sporočilo prepodpišemo s ključem drugega znanega sogovornika. Preprečiti , je nujno, da so elementi pod podpisom na jasno določenih mestih glede na njihov pomen: če se podpiše A (PubA, PubB), se mora podpisati B (PubB, PubA). To govori tudi o pomembnosti izbire strukture in formata serializiranih podatkov. Na primer, nizi v kodiranju ASN.1 DER so razvrščeni: SET OF(PubA, PubB) bo identičen SET OF(PubB, PubA).
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ═════════════════╗ │────────────────────── ──────────── ─────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═════ ═══════ ═══════════════╝ │IdB, PubB, znak(SignPrvB, (IdB, PubA, PubB)) │ ╔══════════ ═════ ════════════╗ │<─────────────────────────── ─────────── ─────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═════════ ══════ ══ ══════════╝ │ znak(SignPrvA, (IdA, PubB, PubA)) │ ╔═════════════════ ════╗ │─ ───────────────────────────────────────── ───>│ ║preveri(SignPubB, ...) ║ │ │ ║ključ = dh (prva, PUBB) ║ │ │ │
Vendar še vedno nismo »dokazali«, da smo za to sejo ustvarili isti skupni ključ. Načeloma lahko brez tega koraka – že prva prometna povezava bo neveljavna, želimo pa, da bi bili ob končanem rokovanju prepričani, da je res vse dogovorjeno. Trenutno imamo na voljo protokol ISO/IEC IS 9798-3.
Sam ustvarjeni ključ lahko podpišemo. To je nevarno, saj je možno, da pride do puščanja v uporabljenem algoritmu podpisa (čeprav bitov na podpis, vendar še vedno pušča). Zgoščeno vrednost izpeljanega ključa je mogoče podpisati, vendar je lahko uhajanje celo zgoščene vrednosti izpeljanega ključa dragoceno pri napadu s surovo silo na funkcijo izpeljave. SIGMA uporablja funkcijo MAC, ki avtentikira ID pošiljatelja.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔══════════ ═════════════════╗ │────────────────────── ──────────── ──────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚ ═══════ ════════════════════╝ │IdB, PubB, znak(SignPrvB, (PubA, PubB)), MAC(IdB) │ ╔════ ═══ │<────────────────── ─────────── ────────────────────── ─│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚════ ══════════════ ═══════ ══╝ │ │ ╔════════════ ═════════╗ │ znak(SignPrvA, (PubB, PubA)), MAC(IdA) │ ║Ključ = DH( PrvA, PubB) ║ │────────────────────── ── ────────────────────── ─────>│ ║preveri(Key, IdB) ║ │ │ ║preveri(SignPubB, ...)║ │ │ ╚══════════════════ ══ ═╝ │ │
Za optimizacijo bodo nekateri morda želeli znova uporabiti svoje efemerne ključe (kar je seveda obžalovanja vredno za PFS). Na primer, ustvarili smo par ključev, se poskušali povezati, vendar TCP ni bil na voljo ali pa je bil prekinjen nekje na sredini protokola. Škoda je zapraviti izgubljeno entropijo in procesorske vire za nov par. Zato bomo uvedli tako imenovani piškotek - psevdonaključno vrednost, ki bo ščitila pred morebitnimi napadi naključnega ponavljanja pri ponovni uporabi efemernih javnih ključev. Zaradi vezave med piškotkom in efemernim javnim ključem se lahko javni ključ nasprotne strani odstrani iz podpisa kot nepotreben.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA, CookieA │ ╔════════ ═══════════════════╗ │──────────────────── ──────────── ───────────────────────────────────────── ─>│ ║SignPrvA, SignPubA = load( )║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═════════════════════════ ══╝ │IdB, PubB, CookieB , znak(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔══════════════════════════ ═ ╗ │< ────────────────────────────────────────── ─────────── ─────────────────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚══════ ═════════════════════╝ │ │ ╔══════════════ ═══════╗ │ znak( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA) │ ║Key = DH(PrvA, PubB) ║ │────────────────────── ── ───────────────────────────────────────── ───────>│ ║ preveri (Key, IdB) ║ │ │ ║ preveri (SignPubB, ...)║ │ │ ╚═════════════════════╝ │ │
Nazadnje želimo pridobiti zasebnost naših sogovornikov od pasivnega opazovalca. Da bi to naredili, SIGMA predlaga, da najprej izmenjamo kratkotrajne ključe in razvijemo skupni ključ za šifriranje sporočil za avtentikacijo in identifikacijo. SIGMA opisuje dve možnosti:
- SIGMA-I - ščiti pobudnika pred aktivnimi napadi, odzivnika pred pasivnimi: pobudnik avtentikira odzivnika in če nekaj ne ustreza, potem ne izda svoje identifikacije. Obdolženec izda svojo identifikacijo, če se z njim začne aktivni protokol. Pasivni opazovalec se ne nauči ničesar;
SIGMA-R - ščiti odzivnika pred aktivnimi napadi, pobudnika pred pasivnimi. Vse je ravno obratno, vendar se v tem protokolu prenašajo že štiri sporočila rokovanja.Izbrali smo SIGMA-I, ker je bolj podoben tistemu, kar pričakujemo od znanih stvari odjemalec-strežnik: odjemalca prepozna samo overjen strežnik, strežnik pa vsi že poznajo. Poleg tega je lažje implementirati zaradi manj sporočil rokovanja. Vse, kar dodamo protokolu, je šifriranje dela sporočila in prenos identifikatorja A v šifrirani del zadnjega sporočila:
Puba, piškote │ ╔══════════ ═════════════════╗ │───────────────zgoti ───── ─────────── ────────────────────────────────────── ─dih ═════════ ════╝ │ PubB, CookieB, Enc((IdB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB))) │ ╔════ ═ ═══════════════ ═══════╗ │<────────────────────────── ───── ────────── ║SignP rvB, SignPubB = load()║ │ │ ║ PrvB, PubB = DHgen() ║ │ │ ╚═══════════ ════════════════╝ │ │ ╔════════ ═══════════ ══╗ │ Enc((IdA, znak( SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Key = DH(PrvA, PubB) ║ │─────────────────────── ────────────────── ────────── ( SignPubB, ...)║ │ │ ╚═══════════ ══════════╝ │ │
- Za podpis se uporablja GOST R algoritem z 256-bitnimi ključi.
- Za generiranje javnega ključa se uporablja VKO 34.10.
- CMAC se uporablja kot MAC. Tehnično je to poseben način delovanja blokovne šifre, opisan v GOST R 34.13-2015. Kot funkcija šifriranja za ta način − (34.12-2015).
- Zgoščena vrednost njegovega javnega ključa se uporablja kot identifikator sogovornika. Uporablja se kot hash (34.11 2012 bitov).
Po rokovanju se dogovorimo za skupni ključ. Uporabljamo ga lahko za overjeno šifriranje transportnih sporočil. Ta del je zelo preprost in težko se je zmotiti: povečamo števec sporočil, šifriramo sporočilo, overimo (MAC) števec in šifrirano besedilo, pošljemo. Ob prejemu sporočila preverimo, ali ima števec pričakovano vrednost, šifrirano besedilo overimo s števcem in ga dešifriramo. Kateri ključ naj uporabim za šifriranje sporočil rokovanja, transportnih sporočil in kako jih avtentikirati? Uporaba enega ključa za vsa ta opravila je nevarna in nespametna. Potrebno je ustvariti ključe z uporabo specializiranih funkcij (ključna funkcija izpeljave). Še enkrat, ne cepimo in si nekaj izmislimo: že dolgo poznan, dobro raziskan in nima znanih težav. Na žalost izvorna knjižnica Python nima te funkcije, zato uporabljamo plastična vrečka. HKDF interno uporablja , ki pa uporablja zgoščevalno funkcijo. Primer implementacije v Pythonu na strani Wikipedije zahteva le nekaj vrstic kode. Kot v primeru 34.10. 2012. 256 bomo kot zgoščevalno funkcijo uporabili Stribog-XNUMX. Izhod naše funkcije dogovora o ključu se bo imenoval sejni ključ, iz katerega bodo ustvarjeni manjkajoči simetrični:
kdf = Hkdf(None, key_session, hash=GOST34112012256) kdf.expand(b"handshake1-mac-identity") kdf.expand(b"handshake1-enc") kdf.expand(b"handshake1-mac") kdf.expand(b"handshake2-mac-identity") kdf.expand(b"handshake2-enc") kdf.expand(b"handshake2-mac") kdf.expand(b"transport-initiator-enc") kdf.expand(b"transport-initiator-mac") kdf.expand(b"transport-responder-enc") kdf.expand(b"transport-responder-mac")Strukture/sheme
Poglejmo, kakšne strukture ASN.1 imamo zdaj za prenos vseh teh podatkov:
class Msg(Choice): schema = (( ("text", MsgText()), ("handshake0", MsgHandshake0(expl=tag_ctxc(0))), ("handshake1", MsgHandshake1(expl=tag_ctxc(1))), ("handshake2", MsgHandshake2(expl=tag_ctxc(2))), )) class MsgText(Sequence): schema = (( ("payload", MsgTextPayload()), ("payloadMac", MAC()), )) class MsgTextPayload(Sequence): schema = (( ("nonce", Integer(bounds=(0, float("+inf")))), ("ciphertext", OctetString(bounds=(1, MaxTextLen))), )) class MsgHandshake0(Sequence): schema = (( ("cookieInitiator", Cookie()), ("pubKeyInitiator", PubKey()), )) class MsgHandshake1(Sequence): schema = (( ("cookieResponder", Cookie()), ("pubKeyResponder", PubKey()), ("ukm", OctetString(bounds=(8, 8))), ("ciphertext", OctetString()), ("ciphertextMac", MAC()), )) class MsgHandshake2(Sequence): schema = (( ("ciphertext", OctetString()), ("ciphertextMac", MAC()), )) class HandshakeTBE(Sequence): schema = (( ("identity", OctetString(bounds=(32, 32))), ("signature", OctetString(bounds=(64, 64))), ("identityMac", MAC()), )) class HandshakeTBS(Sequence): schema = (( ("cookieTheir", Cookie()), ("cookieOur", Cookie()), ("pubKeyOur", PubKey()), )) class Cookie(OctetString): bounds = (16, 16) class PubKey(OctetString): bounds = (64, 64) class MAC(OctetString): bounds = (16, 16)HandshakeTBS je tisto, kar bo podpisano. HandshakeTBE - kaj bo šifrirano. Opozarjam vas na polje ukm v MsgHandshake1. 34.10 VKO za še večjo randomizacijo generiranih ključev vključuje parameter UKM (user keying material) - samo dodatna entropija.
Dodajanje kriptografije kodi
Upoštevajmo samo spremembe prvotne kode, saj je ogrodje ostalo enako (pravzaprav je bila najprej napisana končna izvedba, nato pa je bila iz nje izrezana vsa kriptografija).
Ker bo avtentikacija in identifikacija sogovornikov potekala z javnimi ključi, jih je treba zdaj nekje shraniti za dalj časa. Zaradi enostavnosti uporabljamo JSON tako:
{ "our": { "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98", "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1" }, "their": { "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce", "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a" } }naš - naš par ključev, heksadecimalni zasebni in javni ključ. njihovi — imena sogovornikov in njihovi javni ključi. Spremenimo argumente ukazne vrstice in dodamo naknadno obdelavo podatkov JSON:
from pygost import gost3410 from pygost.gost34112012256 import GOST34112012256 CURVE = gost3410.GOST3410Curve( *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"] ) parser = argparse.ArgumentParser(description="GOSTIM") parser.add_argument( "--keys-gen", action="store_true", help="Generate JSON with our new keypair", ) parser.add_argument( "--keys", default="keys.json", required=False, help="JSON with our and their keys", ) parser.add_argument( "--bind", default="::1", help="Address to listen on", ) parser.add_argument( "--port", type=int, default=6666, help="Port to listen on", ) args = parser.parse_args() if args.keys_gen: prv_raw = urandom(32) pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw)) pub_raw = gost3410.pub_marshal(pub) print(json.dumps({ "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)}, "their": {}, })) exit(0) # Parse and unmarshal our and their keys {{{ with open(args.keys, "rb") as fd: _keys = json.loads(fd.read().decode("utf-8")) KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"])) _pub = hexdec(_keys["our"]["pub"]) KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub) KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest()) for peer_name, pub_raw in _keys["their"].items(): _pub = hexdec(pub_raw) KEYS[GOST34112012256(_pub).digest()] = { "name": peer_name, "pub": gost3410.pub_unmarshal(_pub), } # }}}Zasebni ključ algoritma 34.10 je naključno število. 256-bitna velikost za 256-bitne eliptične krivulje. PyGOST ne deluje z nizom bajtov, ampak z , zato je treba naš zasebni ključ (urandom(32)) pretvoriti v številko z gost3410.prv_unmarshal(). Javni ključ se določi deterministično iz zasebnega ključa z gost3410.public_key(). Javni ključ 34.10 sta dve veliki številki, ki ju je prav tako treba pretvoriti v zaporedje bajtov za lažje shranjevanje in prenos z uporabo gost3410.pub_marshal().
Po branju datoteke JSON je treba javne ključe ustrezno pretvoriti nazaj z uporabo gost3410.pub_unmarshal(). Ker bomo identifikatorje sogovornikov prejeli v obliki zgoščene vrednosti iz javnega ključa, jih lahko takoj vnaprej izračunamo in postavimo v slovar za hitro iskanje. Zgoščevanje Stribog-256 je gost34112012256.GOST34112012256(), ki v celoti ustreza vmesniku hashlib funkcij zgoščevanja.
Kako se je spremenila korutina iniciatorja? Vse je po shemi rokovanja: ustvarimo piškotek (128-bit je dovolj), kratkotrajni par ključev 34.10, ki bo uporabljen za funkcijo dogovora o ključu VKO.
395 async def initiator(host, port): 396 _id = repr((host, port)) 397 logging.info("%s: dialing", _id) 398 reader, writer = await asyncio.open_connection(host, port) 399 # Generate our ephemeral public key and cookie, send Handshake 0 message {{{ 400 cookie_our = Cookie(urandom(16)) 401 prv = gost3410.prv_unmarshal(urandom(32)) 402 pub_our = gost3410.public_key(CURVE, prv) 403 pub_our_raw = PubKey(gost3410.pub_marshal(pub_our)) 404 writer.write(Msg(("handshake0", MsgHandshake0(( 405 ("cookieInitiator", cookie_our), 406 ("pubKeyInitiator", pub_our_raw), 407 )))).encode()) 408 # }}} 409 await writer.drain()- čakamo na odgovor in dekodiramo dohodno sporočilo Msg;
- poskrbite, da boste prejeli rokovanje1;
- dekodirati efemerni javni ključ nasprotne strani in izračunati ključ seje;
- Generiramo simetrične ključe, potrebne za obdelavo TBE dela sporočila.
423 logging.info("%s: got %s message", _id, msg.choice) 424 if msg.choice != "handshake1": 425 logging.warning("%s: unexpected message, disconnecting", _id) 426 writer.close() 427 return 428 # }}} 429 msg_handshake1 = msg.value 430 # Validate Handshake message {{{ 431 cookie_their = msg_handshake1["cookieResponder"] 432 pub_their_raw = msg_handshake1["pubKeyResponder"] 433 pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw)) 434 ukm_raw = bytes(msg_handshake1["ukm"]) 435 ukm = ukm_unmarshal(ukm_raw) 436 key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001) 437 kdf = Hkdf(None, key_session, hash=GOST34112012256) 438 key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity") 439 key_handshake1_enc = kdf.expand(b"handshake1-enc") 440 key_handshake1_mac = kdf.expand(b"handshake1-mac")UKM je 64-bitno število (urandom(8)), ki prav tako zahteva deserializacijo iz svoje bajtne predstavitve z gost3410_vko.ukm_unmarshal(). Funkcija VKO za 34.10-bitni 2012/256/3410 je gost34102012256_vko.kek_XNUMX() (KEK - šifrirni ključ).
Ustvarjeni ključ seje je že 256-bitno psevdonaključno zaporedje bajtov. Zato ga je mogoče takoj uporabiti v funkcijah HKDF. Ker GOST34112012256 ustreza vmesniku hashlib, ga je mogoče takoj uporabiti v razredu Hkdf. Ne podajamo soli (prvega argumenta Hkdf), saj bo generirani ključ, zaradi minljivosti sodelujočih parov ključev, drugačen za vsako sejo in že vsebuje dovolj entropije. kdf.expand() privzeto že izdela 256-bitne ključe, ki so kasneje potrebni za Grasshopper.
Nato se preverita dela TBE in TBS dohodnega sporočila:
- MAC nad dohodnim šifriranim besedilom se izračuna in preveri;
- šifrirano besedilo je dešifrirano;
- Struktura TBE je dekodirana;
- iz njega vzamemo identifikator sogovornika in preverimo, ali nam je sploh znan;
- MAC nad tem identifikatorjem se izračuna in preveri;
- verificira se podpis nad strukturo TBS, ki vključuje piškotek obeh strani in javni efemerni ključ nasprotne strani. Podpis se overi z dolgoživim podpisnim ključem sogovornika.
441 try: 442 peer_name = validate_tbe( 443 msg_handshake1, 444 key_handshake1_mac_identity, 445 key_handshake1_enc, 446 key_handshake1_mac, 447 cookie_our, 448 cookie_their, 449 pub_their_raw, 450 ) 451 except ValueError as err: 452 logging.warning("%s: %s, disconnecting", _id, err) 453 writer.close() 454 return 455 # }}} 128 def validate_tbe( 129 msg_handshake: Union[MsgHandshake1, MsgHandshake2], 130 key_mac_identity: bytes, 131 key_enc: bytes, 132 key_mac: bytes, 133 cookie_their: Cookie, 134 cookie_our: Cookie, 135 pub_key_our: PubKey, 136 ) -> str: 137 ciphertext = bytes(msg_handshake["ciphertext"]) 138 mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext) 139 if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])): 140 raise ValueError("invalid MAC") 141 plaintext = ctr( 142 GOST3412Kuznechik(key_enc).encrypt, 143 KUZNECHIK_BLOCKSIZE, 144 ciphertext, 145 8 * b"x00", 146 ) 147 try: 148 tbe, _ = HandshakeTBE().decode(plaintext) 149 except ASN1Error: 150 raise ValueError("can not decode TBE") 151 key_sign_pub_hash = bytes(tbe["identity"]) 152 peer = KEYS.get(key_sign_pub_hash) 153 if peer is None: 154 raise ValueError("unknown identity") 155 mac_tag = mac( 156 GOST3412Kuznechik(key_mac_identity).encrypt, 157 KUZNECHIK_BLOCKSIZE, 158 key_sign_pub_hash, 159 ) 160 if not compare_digest(mac_tag, bytes(tbe["identityMac"])): 161 raise ValueError("invalid identity MAC") 162 tbs = HandshakeTBS(( 163 ("cookieTheir", cookie_their), 164 ("cookieOur", cookie_our), 165 ("pubKeyOur", pub_key_our), 166 )) 167 if not gost3410.verify( 168 CURVE, 169 peer["pub"], 170 GOST34112012256(tbs.encode()).digest(), 171 bytes(tbe["signature"]), 172 ): 173 raise ValueError("invalid signature") 174 return peer["name"]Kot sem napisal zgoraj, 34.13 opisuje različne od 34.12. Med njimi je način za ustvarjanje imitacijskih vložkov in izračune MAC. V PyGOST je to gost2015.mac(). Ta način zahteva posredovanje šifrirne funkcije (prejemanje in vrnitev enega bloka podatkov), velikost šifrirnega bloka in pravzaprav same podatke. Zakaj ne morete kodirati velikosti šifrirnega bloka? 3413/34.12/2015 opisuje ne samo 128-bitno šifro Grasshopper, ampak tudi 64-bitno - rahlo spremenjen GOST 28147-89, ustvarjen nazaj v KGB in ima še vedno enega najvišjih varnostnih pragov.
Kuznechik se inicializira s klicem gost.3412.GOST3412Kuznechik(key) in vrne objekt z metodami .encrypt()/.decrypt(), ki so primerne za prehod na funkcije 34.13. MAC se izračuna na naslednji način: gost3413.mac(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, šifrirano besedilo). Za primerjavo izračunanega in prejetega MAC ne morete uporabiti običajne primerjave (==) bajtnih nizov, saj ta operacija izgubi primerjalni čas, kar lahko v splošnem primeru vodi do usodnih ranljivosti, kot je napade na TLS. Python ima za to posebno funkcijo, hmac.compare_digest.
Funkcija šifriranja blokov lahko šifrira samo en blok podatkov. Za večje število in tudi ne večkratnik dolžine je treba uporabiti način šifriranja. 34.13-2015 opisuje naslednje: ECB, CTR, OFB, CBC, CFB. Vsak ima svoja sprejemljiva področja uporabe in značilnosti. Na žalost še vedno nimamo standardiziranih (kot npr. CCM, OCB, GCM in podobno) – prisiljeni smo vsaj dodati MAC sami. izberem (CTR): ne zahteva oblazinjenja do velikosti bloka, lahko je vzporeden, uporablja samo šifrirno funkcijo, lahko se varno uporablja za šifriranje velikega števila sporočil (za razliko od CBC, ki ima razmeroma hitre kolizije).
Tako kot .mac() tudi .ctr() sprejema podobne vnose: šifrirano besedilo = gost3413.ctr(GOST3412Kuznechik(ključ).encrypt, KUZNECHIK_BLOCKSIZE, plaintext, iv). Določiti je treba inicializacijski vektor, ki je točno polovica dolžine šifrirnega bloka. Če se naš šifrirni ključ uporablja samo za šifriranje enega sporočila (čeprav iz več blokov), potem je varno nastaviti ničelni inicializacijski vektor. Za šifriranje sporočil rokovanja uporabimo vsakič ločen ključ.
Preverjanje podpisa gost3410.verify() je trivialno: posredujemo eliptično krivuljo, znotraj katere delamo (preprosto jo zabeležimo v našem protokolu GOSTIM), javni ključ podpisnika (ne pozabite, da mora biti to dvojka velika števila in ne bajtni niz), 34.11. 2012. XNUMX hash in sam podpis.
Nato v iniciatorju pripravimo in pošljemo sporočilo rokovanja na handshake2, pri čemer izvedemo enaka dejanja kot med preverjanjem, le simetrično: podpisovanje naših ključev namesto preverjanja itd.
456 # Prepare and send Handshake 2 message {{{ 457 tbs = HandshakeTBS(( 458 ("cookieTheir", cookie_their), 459 ("cookieOur", cookie_our), 460 ("pubKeyOur", pub_our_raw), 461 )) 462 signature = gost3410.sign( 463 CURVE, 464 KEY_OUR_SIGN_PRV, 465 GOST34112012256(tbs.encode()).digest(), 466 ) 467 key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity") 468 mac_tag = mac( 469 GOST3412Kuznechik(key_handshake2_mac_identity).encrypt, 470 KUZNECHIK_BLOCKSIZE, 471 bytes(KEY_OUR_SIGN_PUB_HASH), 472 ) 473 tbe = HandshakeTBE(( 474 ("identity", KEY_OUR_SIGN_PUB_HASH), 475 ("signature", OctetString(signature)), 476 ("identityMac", MAC(mac_tag)), 477 )) 478 tbe_raw = tbe.encode() 479 key_handshake2_enc = kdf.expand(b"handshake2-enc") 480 key_handshake2_mac = kdf.expand(b"handshake2-mac") 481 ciphertext = ctr( 482 GOST3412Kuznechik(key_handshake2_enc).encrypt, 483 KUZNECHIK_BLOCKSIZE, 484 tbe_raw, 485 8 * b"x00", 486 ) 487 mac_tag = mac( 488 GOST3412Kuznechik(key_handshake2_mac).encrypt, 489 KUZNECHIK_BLOCKSIZE, 490 ciphertext, 491 ) 492 writer.write(Msg(("handshake2", MsgHandshake2(( 493 ("ciphertext", OctetString(ciphertext)), 494 ("ciphertextMac", MAC(mac_tag)), 495 )))).encode()) 496 # }}} 497 await writer.drain() 498 logging.info("%s: session established: %s", _id, peer_name)Ko je seja vzpostavljena, se generirajo transportni ključi (ločen ključ za šifriranje, za avtentikacijo, za vsako stran) in Grasshopper se inicializira za dešifriranje in preverjanje MAC:
499 # Run text message sender, initialize transport decoder {{{ 500 key_initiator_enc = kdf.expand(b"transport-initiator-enc") 501 key_initiator_mac = kdf.expand(b"transport-initiator-mac") 502 key_responder_enc = kdf.expand(b"transport-responder-enc") 503 key_responder_mac = kdf.expand(b"transport-responder-mac") ... 509 asyncio.ensure_future(msg_sender( 510 peer_name, 511 key_initiator_enc, 512 key_initiator_mac, 513 writer, 514 )) 515 encrypter = GOST3412Kuznechik(key_responder_enc).encrypt 516 macer = GOST3412Kuznechik(key_responder_mac).encrypt 517 # }}} 519 nonce_expected = 0 520 # Wait for test messages {{{ 521 while True: 522 data = await reader.read(MaxMsgLen) ... 530 msg, tail = Msg().decode(buf) ... 537 try: 538 await msg_receiver( 539 msg.value, 540 nonce_expected, 541 macer, 542 encrypter, 543 peer_name, 544 ) 545 except ValueError as err: 546 logging.warning("%s: %s", err) 547 break 548 nonce_expected += 1 549 # }}}Korrutina msg_sender zdaj šifrira sporočila, preden jih pošlje prek povezave TCP. Vsako sporočilo ima monotono naraščajočo številko nonce, ki je tudi inicializacijski vektor, ko je šifrirana v načinu števca. Vsako sporočilo in blok sporočila imata zagotovljeno drugačno vrednost števca.
async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None: nonce = 0 encrypter = GOST3412Kuznechik(key_enc).encrypt macer = GOST3412Kuznechik(key_mac).encrypt in_queue = IN_QUEUES[peer_name] while True: text = await in_queue.get() if text is None: break ciphertext = ctr( encrypter, KUZNECHIK_BLOCKSIZE, text.encode("utf-8"), long2bytes(nonce, 8), ) payload = MsgTextPayload(( ("nonce", Integer(nonce)), ("ciphertext", OctetString(ciphertext)), )) mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode()) writer.write(Msg(("text", MsgText(( ("payload", payload), ("payloadMac", MAC(mac_tag)), )))).encode()) nonce += 1Dohodna sporočila obdela rutina msg_receiver, ki skrbi za preverjanje pristnosti in dešifriranje:
async def msg_receiver( msg_text: MsgText, nonce_expected: int, macer, encrypter, peer_name: str, ) -> None: payload = msg_text["payload"] if int(payload["nonce"]) != nonce_expected: raise ValueError("unexpected nonce value") mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode()) if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])): raise ValueError("invalid MAC") plaintext = ctr( encrypter, KUZNECHIK_BLOCKSIZE, bytes(payload["ciphertext"]), long2bytes(nonce_expected, 8), ) text = plaintext.decode("utf-8") await OUT_QUEUES[peer_name].put(text)Zaključek
GOSTIM je namenjen izključno uporabi v izobraževalne namene (saj ni zajet v testih)! Izvorno kodo programa lahko prenesete (Стрибог-256 хэш: 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). Как и все мои проекты, типа , , , , GOSTIM je popolnoma , razdeljen pod pogoji .
, , član , Python/Go razvijalec, glavni specialist .
Vir: www.habr.com
