"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд" Сайн байна уу, Khabro хотын оршин суугчид! Энэ ном нь утас боловсруулалтыг ойлгохыг хүсдэг аливаа хөгжүүлэгчдэд тохиромжтой. Түгээмэл програмчлалыг ойлгох нь Кафка болон Кафкагийн урсгалыг илүү сайн ойлгоход тусална. Кафкагийн хүрээг өөрөө мэдэх нь сайхан байх болно, гэхдээ энэ нь шаардлагагүй: би танд хэрэгтэй бүх зүйлийг хэлэх болно. Туршлагатай Кафка хөгжүүлэгчид болон шинэхэн хүмүүс энэ номноос Кафка урсгалын номын санг ашиглан урсгал боловсруулах сонирхолтой програмуудыг хэрхэн бүтээх талаар суралцах болно. Цувралчлал гэх мэт ойлголтуудыг аль хэдийн мэддэг дунд болон ахисан түвшний Java хөгжүүлэгчид Kafka Streams програмыг бий болгохын тулд ур чадвараа ашиглаж сурах болно. Номын эх код нь Java 8 дээр бичигдсэн бөгөөд Java 8 lambda илэрхийллийн синтаксийг ихээхэн ашигладаг тул lambda функцүүдтэй (өөр програмчлалын хэлээр ч гэсэн) хэрхэн ажиллахыг мэдэх нь ашигтай байх болно.

Ишлэл. 5.3. Нэгтгэх, цонхлох үйлдлүүд

Энэ хэсэгт бид Кафка урсгалын хамгийн ирээдүйтэй хэсгүүдийг судлах болно. Одоогоор бид Кафка урсгалын дараах асуудлуудыг авч үзсэн.

  • боловсруулах топологийг бий болгох;
  • урсгал програмуудад төлөвийг ашиглах;
  • өгөгдлийн урсгалын холболт хийх;
  • үйл явдлын урсгал (KStream) болон шинэчлэлтийн урсгал (KTable) хоорондын ялгаа.

Дараах жишээн дээр бид эдгээр бүх элементүүдийг нэгтгэх болно. Та мөн стриминг програмын өөр нэг гайхалтай боломж болох цонхны талаар суралцах болно. Бидний эхний жишээ бол энгийн нэгдэл байх болно.

5.3.1. Хувьцааны борлуулалтыг салбарын салбараар нэгтгэх

Дамжуулах өгөгдөлтэй ажиллахад нэгтгэх, бүлэглэх нь амин чухал хэрэгсэл юм. Хувь хүний ​​бүртгэлийг хүлээн авч шалгах нь ихэвчлэн хангалтгүй байдаг. Өгөгдлөөс нэмэлт мэдээллийг гаргаж авахын тулд тэдгээрийг бүлэглэх, нэгтгэх шаардлагатай.

Энэ жишээнд та хэд хэдэн салбарын компаниудын хувьцааны борлуулалтын хэмжээг хянах шаардлагатай өдрийн худалдаачны хувцас өмсөх болно. Тодруулбал, та салбар бүрт хамгийн их хувьцааны борлуулалт хийдэг таван компанийг сонирхож байна.

Ийм нэгтгэх нь өгөгдлийг хүссэн хэлбэрт (ерөнхий хэллэгээр) хөрвүүлэхийн тулд дараах хэд хэдэн алхмуудыг хийх шаардлагатай болно.

  1. Түүхий хувьцааны арилжааны мэдээллийг нийтэлсэн сэдэвт суурилсан эх сурвалжийг бий болгох. Бид StockTransaction төрлийн объектыг ShareVolume төрлийн объект руу буулгах шаардлагатай болно. Гол нь StockTransaction объект нь борлуулалтын мета өгөгдлийг агуулдаг боловч бидэнд зарагдаж буй хувьцааны тооны талаарх мэдээлэл л хэрэгтэй.
  2. ShareVolume өгөгдлийг хувьцааны тэмдгээр бүлэглэх. Тэмдгээр бүлэглэсний дараа та энэ өгөгдлийг хувьцааны борлуулалтын дүнгийн дэд нийлбэр болгон задалж болно. KStream.groupBy арга нь KGroupedStream төрлийн жишээг буцаадаг гэдгийг тэмдэглэх нь зүйтэй. Мөн та KGroupedStream.reduce аргыг залгаснаар KTable жишээг авч болно.

KGroupedStream интерфейс гэж юу вэ

KStream.groupBy болон KStream.groupByKey аргууд нь KGroupedStream-ийн жишээг буцаадаг. KGroupedStream нь товчлууруудаар бүлэглэсний дараа үйл явдлын урсгалын завсрын дүрслэл юм. Энэ нь түүнтэй шууд ажиллахад огт зориулагдаагүй болно. Үүний оронд KGroupedStream-ийг нэгтгэх үйлдлүүдэд ашигладаг бөгөөд энэ нь үргэлж KTable-ийг үүсгэдэг. Мөн нэгтгэх үйл ажиллагааны үр дүн нь KTable бөгөөд тэд улсын дэлгүүрийг ашигладаг тул үр дүнд нь бүх шинэчлэлтүүд дамжуулах шугам руу илгээгдэхгүй байж магадгүй юм.

KTable.groupBy арга нь ижил төстэй KGroupedTable-г буцаадаг - шинэчлэлтийн урсгалын завсрын төлөөлөл, түлхүүрээр дахин бүлэглэсэн.

Богино завсарлага аваад Зураг руу харцгаая. 5.9, энэ нь бидний юунд хүрснийг харуулж байна. Энэ топологи нь танд аль хэдийн танил болсон байх ёстой.

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Одоо энэ топологийн кодыг харцгаая (үүнийг src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java файлаас олж болно) (Жагсаалт 5.2).

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Өгөгдсөн код нь товч бөгөөд хэд хэдэн мөрөнд хийгдсэн үйлдлүүдийн их хэмжээгээр ялгагдана. Та builder.stream аргын эхний параметрт шинэ зүйлийг анзаарч магадгүй: Consumed.withOffsetResetPolicy аргыг ашиглан тохируулсан AutoOffsetReset.EARLIEST тооллын төрлийн утга (мөн LATEST байдаг). Энэ тооллогын төрлийг KStream эсвэл KTable бүрт офсет дахин тохируулах стратегийг тодорхойлоход ашиглаж болох ба тохиргооноос офсет дахин тохируулах сонголтоос давуу эрхтэй болно.

GroupByKey болон GroupBy

KStream интерфейс нь бичлэгийг бүлэглэх хоёр аргатай: GroupByKey болон GroupBy. Хоёулаа KGroupedTable-г буцаадаг тул та тэдгээрийн хооронд ямар ялгаа байгаа, алийг нь ашиглах вэ гэж гайхаж магадгүй юм.

GroupByKey аргыг KStream дахь түлхүүрүүд аль хэдийн хоосон биш үед ашигладаг. Хамгийн гол нь "дахин хуваахыг шаарддаг" гэсэн тугийг хэзээ ч суулгаагүй.

GroupBy арга нь таныг бүлэглэх түлхүүрүүдийг өөрчилсөн гэж үздэг тул дахин хуваах тугийг үнэн гэж тохируулсан. GroupBy аргын дараа нэгдэх, нэгтгэх гэх мэтийг гүйцэтгэх нь автоматаар дахин хуваалтыг бий болгоно.
Дүгнэлт: Боломжтой бол та GroupBy биш GroupByKey-г ашиглах хэрэгтэй.

mapValues ​​болон groupBy аргууд юу хийх нь ойлгомжтой тул sum() аргыг (src/main/java/bbejeck/model/ShareVolume.java-аас олно) харцгаая (Жагсаалт 5.3).

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
ShareVolume.sum арга нь хувьцааны борлуулалтын нийт дүнг буцаадаг бөгөөд тооцооллын бүх хэлхээний үр дүн нь KTable объект юм. . Одоо та KTable-ийн гүйцэтгэх үүргийг ойлгож байна. ShareVolume объектууд ирэхэд харгалзах KTable объект нь хамгийн сүүлийн үеийн шинэчлэлтийг хадгалдаг. Бүх шинэчлэлтүүд өмнөх shareVolumeKTable-д тусгагдсан гэдгийг санах нь чухал боловч бүгдийг нь цааш нь илгээдэггүй.

Дараа нь бид энэ KTable-г нэгтгэн (арилжаагдсан хувьцааны тоогоор) салбар бүрт хамгийн их хувьцаа арилжсан таван компанид очиход ашигладаг. Энэ тохиолдолд бидний үйлдлүүд эхний нэгтгэхтэй төстэй байх болно.

  1. Хувь хүний ​​ShareVolume объектуудыг салбараар нь бүлэглэхийн тулд өөр groupBy үйлдлийг гүйцэтгэнэ.
  2. ShareVolume объектуудыг нэгтгэн дүгнэж эхэл. Энэ удаад нэгтгэх объект нь тогтмол хэмжээтэй тэргүүлэх дараалал юм. Энэ тогтмол хэмжээний дараалалд хамгийн их хувьцаа зарсан таван компани л үлддэг.
  3. Өмнөх догол мөрийн дарааллыг мөрийн утгад буулгаж, хамгийн их арилжаалагдсан таван хувьцааг салбараар нь тоогоор нь буцаа.
  4. Үр дүнг сэдвийн дагуу мөр хэлбэрээр бичнэ үү.

Зураг дээр. Зураг 5.10-д өгөгдлийн урсгалын топологийн графикийг үзүүлэв. Таны харж байгаагаар хоёр дахь шатны боловсруулалт нь маш энгийн.

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Одоо бид энэ хоёр дахь шатны боловсруулалтын бүтцийн талаар тодорхой ойлголттой болсон тул түүний эх код руу хандаж болно (та үүнийг src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java файлаас олох болно) (Жагсаалт 5.4) .

Энэ эхлүүлэгч нь fixedQueue хувьсагчийг агуулж байна. Энэ нь захиалгын объект бөгөөд арилжаалагдсан хувьцааны бууралтын дарааллаар дээд N үр дүнг хянахад ашигладаг java.util.TreeSet-д зориулсан адаптер юм.

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Та groupBy болон mapValues ​​дуудлагуудыг аль хэдийн үзсэн тул бид эдгээр рүү орохгүй (KTable.print аргыг хуучирсан тул бид KTable.toStream аргыг дуудаж байна). Гэхдээ та aggregate()-ийн KTable хувилбарыг хараахан хараагүй байгаа тул бид үүнийг хэлэлцэхэд бага зэрэг цаг зарцуулах болно.

Таны санаж байгаагаар KTable-ийг өөр болгодог зүйл нь ижил товчлууртай бичлэгүүдийг шинэчлэлт гэж үздэг. KTable нь хуучин оруулгыг шинэ зүйлээр солино. Нэгтгэх нь ижил төстэй байдлаар явагддаг: ижил түлхүүр бүхий хамгийн сүүлийн үеийн бичлэгүүдийг нэгтгэдэг. Бичлэг ирэхэд түүнийг FixedSizePriorityQueue ангийн жишээнд нэмэгч (нийтлэх аргын дуудлагын хоёр дахь параметр) ашиглан нэмэх боловч хэрэв ижил түлхүүрээр өөр бичлэг аль хэдийн байгаа бол хасагч (гурав дахь параметр) ашиглан хуучин бичлэгийг устгана. нэгтгэсэн аргын дуудлага).

Энэ нь манай нэгтгэгч, FixedSizePriorityQueue нь бүх утгыг нэг түлхүүрээр нэгтгэдэггүй, харин хамгийн их арилжаалагдсан N төрлийн хувьцааны хөдөлгөөнт нийлбэрийг хадгалдаг гэсэн үг юм. Ирж буй бичилт бүр нь өнөөг хүртэл зарагдсан нийт хувьцааны тоог агуулна. KTable нь танд аль компанийн хувьцаа хамгийн их арилжаалагдаж байгаа талаар мэдээлэл өгөх бөгөөд шинэчлэлт бүрийг нэгтгэх шаардлагагүй.

Бид хоёр чухал зүйлийг хийж сурсан:

  • KTable дахь утгыг нийтлэг түлхүүрээр бүлэглэх;
  • эдгээр бүлэглэсэн утгууд дээр цуглуулах, нэгтгэх зэрэг ашигтай үйлдлүүдийг гүйцэтгэх.

Эдгээр үйлдлүүдийг хэрхэн гүйцэтгэхийг мэдэх нь Кафка урсгалын програмаар дамжуулж буй өгөгдлийн утгыг ойлгох, ямар мэдээлэл агуулж байгааг ойлгоход чухал юм.

Бид мөн энэ номонд өмнө нь авч үзсэн гол ойлголтуудын заримыг нэгтгэсэн. 4-р бүлэгт бид алдааг тэсвэрлэх чадвартай, орон нутгийн төлөв байдал нь стриминг програмын хувьд хэр чухал болохыг авч үзсэн. Энэ бүлгийн эхний жишээ нь орон нутгийн муж яагаад ийм чухал болохыг харуулсан бөгөөд энэ нь танд аль хэдийн үзсэн мэдээллээ хянах боломжийг олгодог. Орон нутгийн хандалт нь сүлжээний саатлаас зайлсхийж, програмыг илүү гүйцэтгэлтэй, алдаа гаргахад тэсвэртэй болгодог.

Аливаа цуглуулах эсвэл нэгтгэх үйлдлийг гүйцэтгэхдээ улсын дэлгүүрийн нэрийг зааж өгөх ёстой. Бөглөх болон нэгтгэх үйлдлүүд нь KTable-ийн жишээг буцаадаг бөгөөд KTable нь хуучин үр дүнг шинэ үр дүнгээр солихын тулд төлөвийн хадгалалтыг ашигладаг. Таны харж байгаагаар бүх шинэчлэлтүүд дамжуулах шугам руу илгээгддэггүй бөгөөд нэгтгэх үйлдлүүд нь хураангуй мэдээллийг гаргахад зориулагдсан тул энэ нь чухал юм. Хэрэв та орон нутгийн мужийг ашиглахгүй бол KTable нь нэгтгэх болон нэгтгэх бүх үр дүнг дамжуулах болно.

Дараа нь бид цонхны үйл ажиллагаа гэж нэрлэгддэг тодорхой хугацааны дотор нэгтгэх гэх мэт үйлдлүүдийг гүйцэтгэх талаар авч үзэх болно.

5.3.2. Цонхны үйлдлүүд

Өмнөх хэсэгт бид гулсах эргэлт ба нэгтгэлийг танилцуулсан. Уг програм нь хувьцааны борлуулалтыг тасралтгүй нэмэгдүүлж, улмаар бирж дээр хамгийн их арилжаалагдсан таван хувьцааг нэгтгэсэн.

Заримдаа ийм тасралтгүй нэгтгэх, үр дүнг нэгтгэх шаардлагатай байдаг. Заримдаа та зөвхөн тодорхой хугацаанд үйлдлүүдийг хийх хэрэгтэй болдог. Жишээлбэл, сүүлийн 10 минутын дотор тухайн компанийн хувьцаагаар хэдэн солилцоо гүйлгээ хийгдсэнийг тооцоол. Эсвэл сүүлийн 15 минутын дотор хэдэн хэрэглэгч шинэ сурталчилгааны баннер дээр дарсан бэ. Аппликешн ийм үйлдлүүдийг олон удаа хийж болох боловч үр дүн нь зөвхөн тодорхой хугацаанд (цаг хугацааны цонх) хамаарна.

Худалдан авагчийн солилцооны гүйлгээг тоолох

Дараагийн жишээн дээр бид томоохон байгууллага эсвэл ухаалаг хувь хүн санхүүжүүлэгч гэх мэт олон арилжаачдын хувьцааны гүйлгээг хянах болно.

Ингэж мөрдөх хоёр шалтгаан байж болно. Тэдний нэг нь зах зээлийн тэргүүлэгчид юу худалдаж авч/худалдаж байгааг мэдэх хэрэгцээ юм. Хэрэв эдгээр том тоглогчид болон боловсронгуй хөрөнгө оруулагчид боломжийг олж харвал тэдний стратегийг дагах нь зүйтэй юм. Хоёрдахь шалтгаан нь хууль бус дотоод арилжааны шинж тэмдгийг илрүүлэх хүсэл юм. Үүнийг хийхийн тулд та томоохон борлуулалтын өсөлтийн чухал хэвлэлийн мэдээний хамаарлыг шинжлэх хэрэгтэй.

Ийм хяналт нь дараах алхмуудаас бүрдэнэ.

  • хувьцааны гүйлгээний сэдвээс унших урсгалыг бий болгох;
  • Ирж буй бүртгэлийг худалдан авагчийн ID болон хувьцааны тэмдгээр бүлэглэх. groupBy аргыг дуудах нь KGroupedStream ангийн жишээг буцаана;
  • KGroupedStream.windowedBy арга нь цагийн цонхоор хязгаарлагдсан өгөгдлийн урсгалыг буцаадаг бөгөөд энэ нь цонхоор нэгтгэх боломжийг олгодог. Цонхны төрлөөс хамааран TimeWindowedKStream эсвэл SessionWindowedKStream-ийг буцаана;
  • нэгтгэх үйл ажиллагааны гүйлгээний тоо. Цонхны өгөгдлийн урсгал нь энэ тоонд тодорхой бүртгэлийг харгалзан үзсэн эсэхийг тодорхойлдог;
  • боловсруулах явцад тухайн сэдэвт үр дүнг бичих эсвэл консол дээр гаргах.

Энэ програмын топологи нь энгийн боловч тодорхой зураг нь тустай байх болно. Зураг руу харцгаая. 5.11.

Дараа нь бид цонхны үйлдлийн функцууд болон холбогдох кодыг авч үзэх болно.

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"

Цонхны төрлүүд

Кафка урсгалд гурван төрлийн цонх байдаг:

  • хуралдааны;
  • "унах";
  • гулсах / үсрэх.

Аль нь сонгох нь таны бизнесийн шаардлагаас хамаарна. Унтах, үсрэх цонхнууд нь цаг хугацааны хувьд хязгаарлагдмал байдаг бол сессийн цонхнууд нь хэрэглэгчийн үйл ажиллагаагаар хязгаарлагддаг - сессийн үргэлжлэх хугацаа нь зөвхөн хэрэглэгчийн хэр идэвхтэй байгаагаас тодорхойлогддог. Санаж байх ёстой гол зүйл бол бүх цонхны төрлүүд нь системийн цаг биш харин оруулгуудын огноо/цагийн тамга дээр суурилдаг.

Дараа нь бид цонхны төрөл тус бүрээр өөрсдийн топологийг хэрэгжүүлдэг. Бүрэн кодыг зөвхөн эхний жишээнд өгөх болно, бусад төрлийн цонхны хувьд цонхны үйлдлийн төрлөөс өөр юу ч өөрчлөгдөхгүй.

Сеанс цонхнууд

Сеанс цонх нь бусад бүх төрлийн цонхнуудаас эрс ялгаатай. Тэдгээр нь зөвхөн хэрэглэгчийн үйл ажиллагаа (эсвэл таны хянахыг хүсч буй байгууллагын үйл ажиллагаа) гэх мэт цаг хугацаагаар хязгаарлагддаг. Сеанс цонхнууд нь идэвхгүй байх хугацаагаар хязгаарлагддаг.

Зураг 5.12-т сессийн цонхны тухай ойлголтыг харуулав. Жижиг сесс нь зүүн талын сесстэй нийлнэ. Мөн баруун талын хуралдаан нь удаан хугацааны туршид идэвхгүй байсан тул тусдаа байх болно. Сеанс цонхнууд нь хэрэглэгчийн үйл ажиллагаан дээр суурилдаг боловч ямар сессэд хамаарахыг тодорхойлохын тулд оруулгуудын огноо/цаг тэмдгийг ашиглана.

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"

Хувьцааны гүйлгээг хянахын тулд сесс цонхыг ашиглах

Биржийн гүйлгээний талаарх мэдээллийг авахын тулд сессийн цонхыг ашиглацгаая. Сеансын цонхнуудын хэрэгжилтийг Жагсаалт 5.5-д харуулав (үүнийг src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java-аас олж болно).

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Та энэ топологийн ихэнх үйлдлүүдийг аль хэдийн үзсэн тул энд дахин хянаж үзэх шаардлагагүй. Гэхдээ энд бас хэд хэдэн шинэ элементүүд байгаа бөгөөд бид одоо хэлэлцэх болно.

Аливаа groupBy үйлдэл нь ихэвчлэн ямар нэгэн нэгтгэх үйлдлийг (нийтлэх, цуглуулах эсвэл тоолох) гүйцэтгэдэг. Та ажиллаж байгаа нийлбэртэй хуримтлагдсан нэгтгэх эсвэл заасан хугацааны цонхон дахь бүртгэлийг харгалзах цонхны нэгтгэлийг хийж болно.

Жагсаалт 5.5 дахь код нь сессийн цонхон дахь гүйлгээний тоог тоолдог. Зураг дээр. 5.13 Эдгээр үйлдлүүдийг алхам алхмаар шинжилнэ.

windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) руу залгаснаар бид 20 секундын идэвхгүй интервалтай, 15 минутын үргэлжлэх интервалтай сессийн цонх үүсгэдэг. 20 секундын завсарлага гэдэг нь тухайн программ нь одоогийн сесс дууссан эсвэл эхэлснээс хойш 20 секундын дотор ирсэн аливаа оруулгыг одоогийн (идэвхтэй) сессэд оруулна гэсэн үг юм.

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Дараа нь бид сессийн цонхонд ямар нэгтгэх үйлдлийг гүйцэтгэхийг зааж өгнө - энэ тохиолдолд тоолно. Хэрэв ирж буй оруулга идэвхгүй байгаа цонхны гадна (огноо/цаг тэмдгийн аль нэг тал) унавал програм шинэ сесс үүсгэдэг. Хадгалах интервал гэдэг нь сессийг тодорхой хугацаанд хадгалах гэсэн үг бөгөөд сессийн идэвхгүй хугацаанаас хэтэрсэн боловч хавсаргах боломжтой хоцрогдсон өгөгдлийг авах боломжийг олгодог. Нэмж дурдахад, нэгтгэсний үр дүнд үүссэн шинэ сессийн эхлэл ба төгсгөл нь хамгийн эртний болон хамгийн сүүлийн огноо/цагийн тамгатай тохирч байна.

Сешн хэрхэн ажилладагийг харахын тулд тоолох аргын цөөн хэдэн оруулгыг харцгаая (Хүснэгт 5.1).

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Бичлэгүүд ирэхэд бид ижил түлхүүртэй, дуусах хугацаа нь одоогийн огноо/цагийн тэмдэгтээс бага буюу идэвхгүй байдлын интервал, эхлэх цаг нь одоогийн огноо/цаг хугацааны тамга + идэвхгүй байдлын интервалаас их байгаа одоо байгаа сешнүүдийг хайдаг. Үүнийг харгалзан үзэхэд хүснэгтээс дөрвөн оруулга байна. 5.1-ийг дараах байдлаар нэг сесс болгон нэгтгэнэ.

1. Бичлэг 1 эхэлж ирдэг тул эхлэх цаг нь дуусах цагтай тэнцүү бөгөөд 00:00:00 байна.

2. Дараа нь 2-р оролт ирэх бөгөөд бид 23:59:55-аас өмнө дуусдаггүй, 00:00:35-аас хэтрэхгүй сессүүдийг хайж байна. Бид 1-р бичлэгийг олж, 1 ба 2-р сессүүдийг нэгтгэдэг. Бид 1-р хуралдааны эхлэх цаг (өмнөх) ба 2-р сессийн дуусах цагийг (хожим) авдаг бөгөөд ингэснээр бидний шинэ сесс 00:00:00 цагт эхэлж, 00 цагт дуусна: 00:15.

3. Бичлэг 3 ирэхэд бид 00:00:30-аас 00:01:10 цагийн хооронд сесс хайж байгаа боловч олдохгүй байна. 123-345-654,FFBE түлхүүрийн хоёр дахь сессийг 00:00:50 цагт эхэлж, дуустал нэмнэ үү.

4. Бичлэг 4 ирсэн бөгөөд бид 23:59:45-аас 00:00:25 цагийн хооронд сесс хайж байна. Энэ удаад 1 ба 2-р сесс хоёулаа олддог. Гурван сессийг нэг болгон нэгтгэж, эхлэх цаг 00:00:00, дуусах цаг 00:00:15 байна.

Энэ хэсэгт тайлбарласан зүйлээс харахад дараахь чухал нюансуудыг санах нь зүйтэй.

  • сесс нь тогтмол хэмжээтэй цонх биш юм. Хичээлийн үргэлжлэх хугацаа нь тухайн цаг хугацааны үйл ажиллагаагаар тодорхойлогддог;
  • Өгөгдөл дэх огноо/цагийн тамга нь тухайн үйл явдал одоо байгаа сесс доторх эсвэл сул зогсолтын үед хамаарах эсэхийг тодорхойлдог.

Дараа нь бид дараагийн төрлийн цонхны тухай ярих болно - "унадаг" цонхнууд.

"Хийдэг" цонхнууд

Унтаж буй цонхнууд нь тодорхой хугацааны дотор тохиолдох үйл явдлуудыг авдаг. Та 20 секунд тутамд тодорхой компанийн хувьцааны бүх гүйлгээг авах шаардлагатай тул тухайн хугацааны бүх үйл явдлыг цуглуулдаг гэж төсөөлөөд үз дээ. 20 секундын интервал дуусахад цонх эргэлдэж, 20 секундын ажиглалтын шинэ интервал руу шилжинэ. Зураг 5.14-т энэ байдлыг харуулав.

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Таны харж байгаагаар сүүлийн 20 секундэд хүлээн авсан бүх үйл явдлууд цонхонд багтсан болно. Энэ хугацааны төгсгөлд шинэ цонх үүснэ.

Жагсаалт 5.6-д хувьцааны гүйлгээг 20 секунд тутамд бүртгэхийн тулд эргэлддэг цонхны хэрэглээг харуулсан кодыг харуулав (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-аас олсон).

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
TimeWindows.of аргын дуудлагын энэхүү жижиг өөрчлөлтөөр та унадаг цонхыг ашиглаж болно. Энэ жишээ нь until() аргыг дууддаггүй тул анхдагч 24 цагийн хадгалах интервалыг ашиглана.

Эцэст нь, цонхны сонголтуудын сүүлчийнх нь "үсрэх" цонх руу шилжих цаг болжээ.

Гулгадаг ("үсрэх") цонхнууд

Гүйдэг/унадаг цонхнууд нь унадаг цонхтой төстэй боловч бага зэрэг ялгаатай байдаг. Гүйдэг цонхнууд нь сүүлийн үеийн үйл явдлуудыг боловсруулахын тулд шинэ цонх үүсгэхээс өмнө хугацааны төгсгөлийг хүлээхгүй. Тэд цонхны үргэлжлэх хугацаанаас бага хугацааны дараа шинэ тооцооллыг эхлүүлдэг.

Цонх унах, үсрэх хоёрын ялгааг харуулахын тулд хөрөнгийн биржийн гүйлгээг тоолох жишээ рүү буцъя. Бидний зорилго бол гүйлгээний тоог тоолох хэвээр байгаа боловч бид тоолуурыг шинэчлэхээс өмнө бүхэл бүтэн хугацааг хүлээхийг хүсэхгүй байна. Үүний оронд бид тоолуурыг богино хугацаанд шинэчлэх болно. Жишээлбэл, бид гүйлгээний тоог 20 секунд тутамд тоолох боловч тоологчийг 5 секунд тутамд шинэчилж байх болно. 5.15. Энэ тохиолдолд бид давхцсан өгөгдөл бүхий гурван үр дүнгийн цонхтой болно.

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Жагсаалт 5.7-д гулсах цонхыг тодорхойлох кодыг харуулав (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-аас олдсон).

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Давхардсан цонхыг advanceBy() аргад дуудлага нэмснээр үсрэх цонх руу хөрвүүлж болно. Үзүүлсэн жишээнд хадгалах интервал нь 15 минут байна.

Та энэ хэсгээс нэгтгэх үр дүнг цагийн цонхоор хэрхэн хязгаарлахыг үзсэн. Ялангуяа энэ хэсгээс дараах гурван зүйлийг санахыг хүсч байна.

  • сессийн цонхны хэмжээ нь цаг хугацаагаар биш, харин хэрэглэгчийн үйл ажиллагаагаар хязгаарлагддаг;
  • "унадаг" цонхнууд нь тодорхой хугацааны доторх үйл явдлын тоймыг харуулдаг;
  • Цонхыг үсрэх хугацааг тогтоосон боловч байнга шинэчлэгддэг бөгөөд бүх цонхонд давхардсан оруулгуудыг агуулж болно.

Дараа нь бид KTable-ийг KStream болгон хэрхэн холбох талаар сурах болно.

5.3.3. KStream болон KTable объектуудыг холбох

4-р бүлэгт бид хоёр KStream объектыг холбох талаар ярилцсан. Одоо бид KTable болон KStream-ийг хэрхэн холбохыг сурах хэрэгтэй. Энэ нь дараах энгийн шалтгааны улмаас шаардлагатай байж магадгүй юм. KStream нь бичлэгийн урсгал, KTable нь бичлэгийн шинэчлэлтийн урсгал боловч заримдаа та KTable-ийн шинэчлэлтүүдийг ашиглан бичлэгийн урсгалд нэмэлт контекст нэмэхийг хүсч болно.

Хөрөнгийн биржийн гүйлгээний тоог авч, холбогдох салбарын хөрөнгийн биржийн мэдээтэй нэгтгэж үзье. Танд байгаа кодыг өгснөөр үүнд хүрэхийн тулд юу хийх хэрэгтэйг энд харуулав.

  1. Хувьцааны гүйлгээний тооны өгөгдөл бүхий KTable объектыг KStream болгон хөрвүүлж, дараа нь түлхүүрийг энэ хувьцааны тэмдэгт тохирох салбарын салбарыг харуулсан түлхүүрээр солино.
  2. Хөрөнгийн биржийн мэдээ бүхий сэдвийн өгөгдлийг уншдаг KTable объект үүсгэ. Энэхүү шинэ KTable-ийг аж үйлдвэрийн салбараар ангилах болно.
  3. Мэдээллийн шинэчлэлтүүдийг салбарын салбараар хөрөнгийн биржийн гүйлгээний тоотой холбоно.

Одоо энэ мөрийн хөтөлбөрийг хэрхэн хэрэгжүүлэхийг харцгаая.

KTable-г KStream болгон хөрвүүлэх

KTable-г KStream болгон хөрвүүлэхийн тулд та дараах зүйлийг хийх хэрэгтэй.

  1. KTable.toStream() аргыг дуудна уу.
  2. KStream.map аргыг дуудаж, түлхүүрийг салбарын нэрээр сольж, дараа нь Windowed instance-ээс TransactionSummary объектыг татаж авна уу.

Бид эдгээр үйлдлүүдийг дараах байдлаар гинжлэх болно (кодыг src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java файлаас олж болно) (Жагсаалт 5.8).

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Бид KStream.map үйлдлийг гүйцэтгэж байгаа тул буцаасан KStream жишээ нь холболтонд ашиглагдах үед автоматаар дахин хуваагдана.

Бид хөрвүүлэх процессыг дуусгасан бөгөөд дараа нь хувьцааны мэдээ унших KTable объектыг үүсгэх хэрэгтэй.

Хувьцааны мэдээний KTable үүсгэх

Аз болоход KTable объект үүсгэхэд ердөө нэг мөр код шаардлагатай (кодыг src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-аас олж болно) (Жагсаалт 5.9).

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Тохиргоонд Serdes мөрийг ашигладаг тул Serde объектыг зааж өгөх шаардлагагүй гэдгийг тэмдэглэх нь зүйтэй. Мөн ХАМГИЙН ЭРТНИЙ тооллогыг ашигласнаар хүснэгтийг хамгийн эхэнд бичлэгээр дүүргэдэг.

Одоо бид эцсийн алхам руу шилжиж болно - холболт.

Мэдээний шинэчлэлтийг гүйлгээний тооны өгөгдөлтэй холбох

Холболт үүсгэх нь тийм ч хэцүү биш юм. Холбогдох салбарын хувьцааны мэдээ байхгүй тохиолдолд бид зүүн талын нэгдлийг ашиглах болно (шаардлагатай кодыг src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java файлаас олж болно) (Жагсаалт 5.10).

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Энэхүү leftJoin оператор нь маш энгийн. 4-р бүлэгт заасан холболтуудаас ялгаатай нь JoinWindow аргыг ашигладаггүй, учир нь KStream-KTable холболтыг гүйцэтгэх үед KTable-д түлхүүр тус бүрт нэг л оруулга байдаг. Ийм холболт нь цаг хугацаагаар хязгаарлагдахгүй: бичлэг нь KTable-д байгаа эсвэл байхгүй байна. Гол дүгнэлт: KTable объектуудыг ашигласнаар та KStream-ийг бага давтамжтай шинэчлэгдсэн лавлагааны мэдээллээр баяжуулж чадна.

Одоо бид KStream-ээс үйл явдлуудыг баяжуулах илүү үр дүнтэй аргыг авч үзэх болно.

5.3.4. GlobalKTable объектууд

Таны харж байгаагаар үйл явдлын урсгалыг баяжуулах эсвэл контекст нэмэх шаардлагатай байна. 4-р бүлэгт та хоёр KStream объектын хоорондын холболтыг харсан бөгөөд өмнөх хэсэгт та KStream болон KTable хоёрын холболтыг харсан. Эдгээр бүх тохиолдлуудад түлхүүрүүдийг шинэ төрөл эсвэл утгаар буулгахдаа өгөгдлийн урсгалыг дахин хуваах шаардлагатай. Заримдаа дахин хуваах нь тодорхой хийгддэг, заримдаа Кафка урсгал автоматаар хийдэг. Түлхүүрүүд өөрчлөгдөж, бүртгэлүүд шинэ хэсгүүдэд орох ёстой тул дахин хуваах шаардлагатай, эс тэгвээс холболт хийх боломжгүй болно (энэ талаар 4-р бүлэгт, 4.2.4-ийн "Өгөгдлийг дахин хуваах" хэсэгт хэлэлцсэн).

Дахин хуваах нь зардалтай байдаг

Дахин хуваах нь зардал шаарддаг - завсрын сэдвүүдийг бий болгох, өөр сэдвээр давхардсан өгөгдлийг хадгалах нэмэлт нөөцийн зардал; Энэ нь мөн энэ сэдвээс бичиж, уншсаны улмаас хоцролт нэмэгдсэн гэсэн үг юм. Нэмж дурдахад, хэрэв та нэгээс олон тал эсвэл хэмжээсээр нэгдэх шаардлагатай бол холболтыг гинжээр холбож, бичлэгүүдийг шинэ түлхүүрээр буулгаж, дахин хуваах үйл явцыг дахин эхлүүлэх хэрэгтэй.

Жижиг өгөгдлийн багцтай холбогдож байна

Зарим тохиолдолд холбогдох лавлагааны өгөгдлийн хэмжээ харьцангуй бага байдаг тул тэдгээрийн бүрэн хуулбарыг зангилаа бүр дээр хялбархан багтааж болно. Иймэрхүү нөхцөл байдлын хувьд Кафка урсгал нь GlobalKTable ангиллыг өгдөг.

GlobalKTable тохиолдлууд нь өвөрмөц онцлогтой, учир нь програм нь зангилаа бүрт бүх өгөгдлийг хуулбарладаг. Бүх өгөгдөл нь зангилаа бүр дээр байгаа тул үйл явдлын урсгалыг лавлагааны өгөгдлийн түлхүүрээр хуваах шаардлагагүй бөгөөд ингэснээр бүх хуваалтуудад боломжтой болно. Та мөн GlobalKTable объектуудыг ашиглан түлхүүргүй холболт хийж болно. Энэ онцлогийг харуулахын тулд өмнөх жишээнүүдийн нэг рүү буцъя.

KStream объектуудыг GlobalKTable объектуудтай холбож байна

5.3.2-д бид худалдан авагчдын солилцооны гүйлгээг цонхоор нэгтгэсэн. Энэ нэгтгэлийн үр дүн дараах байдалтай харагдлаа.

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Эдгээр үр дүн нь зорилгодоо хүрч байсан ч хэрэглэгчийн нэр болон компанийн бүтэн нэрийг мөн харуулсан бол илүү ашигтай байх байсан. Хэрэглэгчийн нэр болон компанийн нэрийг нэмэхийн тулд та ердийн холболтыг хийж болно, гэхдээ та хоёр үндсэн зураглал хийх, дахин хуваах хэрэгтэй болно. GlobalKTable-ийн тусламжтайгаар та ийм үйл ажиллагааны зардлаас зайлсхийх боломжтой.

Үүнийг хийхийн тулд бид 5.11 жагсаалтын countStream объектыг (харгалзах кодыг src/main/java/bbejeck/chapter_5/GlobalKTableExample.java-аас олж болно) ашиглан хоёр GlobalKTable объекттой холбоно.

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Бид өмнө нь энэ талаар ярилцсан тул би давтахгүй. Гэхдээ уншихад хялбар болгох үүднээс toStream().map функц дэх кодыг доторлогооны lambda илэрхийлэлийн оронд функцийн объект болгон хийсвэрлэсэн болохыг анхаарна уу.

Дараагийн алхам нь GlobalKTable-ийн хоёр тохиолдлыг зарлах явдал юм (үзүүлсэн кодыг src/main/java/bbejeck/chapter_5/GlobalKTableExample.java файлаас олж болно) (Жагсаалт 5.12).

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"

Сэдвийн нэрийг тоологдсон төрлүүдийг ашиглан дүрсэлсэн болохыг анхаарна уу.

Одоо бид бүх бүрэлдэхүүн хэсгүүд бэлэн болсон тул холболтын кодыг бичих л үлдлээ (үүнийг src/main/java/bbejeck/chapter_5/GlobalKTableExample.java файлаас олж болно) (Жагсаалт 5.13).

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Хэдийгээр энэ кодонд хоёр холболт байгаа боловч тэдгээрийн үр дүнгийн аль нь ч тусад нь ашиглагддаггүй тул гинжлэгдсэн байдаг. Үр дүн нь бүх үйлдлийн төгсгөлд харагдана.

Дээрх нэгдэх үйлдлийг ажиллуулахад дараах үр дүн гарч ирнэ.

{customer='Barney, Smith' company="Exxon", transactions= 17}

Мөн чанар нь өөрчлөгдөөгүй боловч эдгээр үр дүн илүү тодорхой харагдаж байна.

Хэрэв та 4-р бүлэг хүртэл тоолж үзвэл хэд хэдэн төрлийн холболтууд ажиллаж байгааг та аль хэдийн харсан байна. Тэдгээрийг хүснэгтэд жагсаасан болно. 5.2. Энэ хүснэгтэд Кафка урсгалын 1.0.0 хувилбарын холболтын боломжуудыг тусгасан болно; Ирээдүйн хувилбаруудад ямар нэг зүйл өөрчлөгдөж магадгүй.

"Кафкагийн урсгал үйлдлээр" ном. Бодит цагийн ажилд зориулсан програмууд болон микро үйлчилгээнүүд"
Бүх зүйлийг дүгнэхийн тулд үндсэн ойлголтуудыг тоймлон хэлье: та орон нутгийн төлөвийг ашиглан үйл явдлын урсгалыг (KStream) холбож, урсгалыг (KTable) шинэчлэх боломжтой. Эсвэл лавлагааны өгөгдлийн хэмжээ хэт том биш бол GlobalKTable объектыг ашиглаж болно. GlobalKTables нь бүх хуваалтыг Kafka Streams програмын зангилаа бүрт хуулбарлаж, түлхүүр нь аль хуваалттай тохирохоос үл хамааран бүх өгөгдлийг ашиглах боломжтой болгодог.

Дараа нь бид Кафка урсгалын онцлогийг харах болно, үүний ачаар бид Кафка сэдвийн өгөгдлийг ашиглахгүйгээр төлөвийн өөрчлөлтийг ажиглах боломжтой.

5.3.5. Асуулт хийх боломжтой байдал

Бид аль хэдийн төлөвтэй холбоотой хэд хэдэн үйлдлүүдийг гүйцэтгэсэн бөгөөд үр дүнг үргэлж консол руу гаргах (хөгжүүлэх зорилгоор) эсвэл тэдгээрийг сэдэвт (үйлдвэрлэлийн зориулалтаар) бичдэг. Сэдвийн үр дүнг бичихдээ тэдгээрийг үзэхийн тулд Кафка хэрэглэгчийг ашиглах ёстой.

Эдгээр сэдвүүдийн өгөгдлийг унших нь материаллаг үзэл бодлын нэг төрөл гэж үзэж болно. Бидний зорилгын үүднээс бид Википедиагийн материаллаг үзлийн тодорхойлолтыг ашиглаж болно: “...асуулгын үр дүнг агуулсан физик мэдээллийн сангийн объект. Жишээлбэл, энэ нь алсын зайн өгөгдлийн локал хуулбар, эсвэл хүснэгтийн мөр ба/эсвэл баганын дэд хэсэг эсвэл нэгтгэх үр дүнгийн багц эсвэл нэгтгэх замаар олж авсан хураангуй хүснэгт байж болно." (https://en.wikipedia.org/wiki) /Материалжуулсан_үзэгдэл).

Kafka Streams нь мөн улсын дэлгүүрүүд дээр интерактив асуулга явуулах боломжийг олгож, эдгээр бодитой үзлийг шууд унших боломжийг танд олгоно. Улсын дэлгүүрийн асуулга нь зөвхөн унших боломжтой үйлдэл гэдгийг анхаарах нь чухал юм. Энэ нь таны аппликешн өгөгдөл боловсруулж байх үед санамсаргүй байдлаар төлөв зөрчил гаргах вий гэж санаа зовох хэрэггүй болно.

Улсын дэлгүүрүүдээс шууд асуух чадвар нь чухал юм. Энэ нь та эхлээд Кафка хэрэглэгчээс мэдээлэл авахгүйгээр хяналтын самбарын програмуудыг үүсгэх боломжтой гэсэн үг юм. Энэ нь дахин өгөгдөл бичих шаардлагагүй тул програмын үр ашгийг нэмэгдүүлдэг.

  • өгөгдлийн байршлын ачаар тэдгээрт хурдан хандах боломжтой;
  • өгөгдлийн давхардал арилсан, учир нь энэ нь гадаад санах ойд бичигдээгүй.

Таны санахыг хүсч буй гол зүйл бол та өөрийн аппликешн дотроос төлөвийг шууд асууж болно. Үүний танд олгож буй боломжуудыг үнэлж баршгүй. Кафкагаас өгөгдөл авч, бүртгэлийг өгөгдлийн санд хадгалахын оронд ижил үр дүнтэй улсын дэлгүүрүүдээс асууж болно. Улсын дэлгүүрт шууд асуулга хийх нь бага код (хэрэглэгч байхгүй), програм хангамж бага (үр дүнг хадгалахын тулд мэдээллийн сангийн хүснэгт шаардлагагүй) гэсэн үг юм.

Бид энэ бүлэгт багагүй зүйлийг тусгасан тул улсын дэлгүүрүүдийн эсрэг интерактив асуулгын талаар ярилцах болно. Гэхдээ бүү санаа зов: 9-р бүлэгт бид интерактив асуулга бүхий энгийн хяналтын самбарын програмыг үүсгэх болно. Энэ нь энэ болон өмнөх бүлгүүдийн зарим жишээг ашиглан интерактив асуулга болон тэдгээрийг Кафка урсгалын программуудад хэрхэн нэмэх боломжтойг харуулах болно.

Хураангуй

  • KStream объектууд нь мэдээллийн санд оруулахтай харьцуулахуйц үйл явдлын урсгалыг илэрхийлдэг. KTable объектууд нь мэдээллийн сангийн шинэчлэлтэй адил шинэчлэлтийн урсгалыг илэрхийлдэг. KTable объектын хэмжээ өсөхгүй, хуучин бичлэгүүд шинээр солигдоно.
  • KTable объектууд нь нэгтгэх үйл ажиллагаанд шаардлагатай.
  • Цонхны үйлдлүүдийг ашигласнаар та нэгтгэсэн өгөгдлийг цагийн хувин болгон хувааж болно.
  • GlobalKTable объектуудын ачаар та хуваалтаас үл хамааран програмын хаанаас ч лавлагааны өгөгдөлд хандах боломжтой.
  • KStream, KTable болон GlobalKTable объектуудын хооронд холболт хийх боломжтой.

Одоогийн байдлаар бид өндөр түвшний KStream DSL ашиглан Kafka Streams програмуудыг бүтээхэд анхаарлаа хандуулсан. Хэдийгээр өндөр түвшний арга барил нь танд цэвэр, товч хөтөлбөрүүдийг бий болгох боломжийг олгодог боловч үүнийг ашиглах нь ашигтай хувилбар юм. DSL KStream-тэй ажиллах нь хяналтын түвшинг бууруулах замаар кодын товчлолыг нэмэгдүүлэх гэсэн үг юм. Дараагийн бүлэгт бид доод түвшний зохицуулагчийн зангилааны API-г үзэж, бусад сонголтуудыг туршиж үзэх болно. Хөтөлбөрүүд нь өмнөхөөсөө урт байх болно, гэхдээ бид хэрэгтэй байж болох бараг бүх зохицуулагч зангилааг үүсгэх боломжтой болно.

→ Номын талаарх дэлгэрэнгүй мэдээллийг эндээс авах боломжтой нийтлэгчийн вэбсайт

→ Хаброжителигийн хувьд купон ашиглан 25% хөнгөлөлт - Кафка урсгалууд

→ Номын цаасан хувилбарын төлбөрийг төлсний дараа цахим номыг цахим шуудангаар илгээнэ.

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх