Кафкагаас хүлээн авсан үйл явдлуудыг дахин боловсруулж байна

Кафкагаас хүлээн авсан үйл явдлуудыг дахин боловсруулж байна

Хөөе Хабр.

Саяхан би туршлагаасаа хуваалцлаа Бид Кафка үйлдвэрлэгч болон хэрэглэгчдэд баталгаатай хүргэлтэд ойртохын тулд ямар параметрүүдийг ихэвчлэн ашигладаг талаар. Энэ нийтлэлд би Кафкагаас хүлээн авсан үйл явдлыг гадны системд түр зуур ашиглах боломжгүй болсны үр дүнд дахин боловсруулах ажлыг хэрхэн зохион байгуулсныг танд хэлэхийг хүсч байна.

Орчин үеийн програмууд нь маш нарийн төвөгтэй орчинд ажилладаг. Бизнесийн логик нь орчин үеийн технологийн стекээр бүрхэгдсэн, Kubernetes эсвэл OpenShift зэрэг найруулагчаар удирддаг Docker дүрс дээр ажиллаж, физик болон виртуал чиглүүлэгчийн сүлжээгээр дамжуулан бусад программууд эсвэл байгууллагын шийдлүүдтэй харилцдаг. Ийм орчинд ямар нэг зүйл үргэлж эвдэрч болзошгүй тул гадны системүүдийн аль нэг нь боломжгүй бол үйл явдлыг дахин боловсруулах нь бидний бизнесийн үйл явцын чухал хэсэг юм.

Кафкагийн өмнө ямар байсан

Төслийн өмнө бид IBM MQ-г асинхрон мессеж дамжуулахад ашиглаж байсан. Үйлчилгээг ажиллуулах явцад ямар нэгэн алдаа гарсан бол хүлээн авсан мессежийг гар аргаар задлан шинжлэхийн тулд үхсэн үсгийн дараалалд (DLQ) байрлуулж болно. DLQ нь ирж буй дарааллын хажууд үүсгэгдсэн бөгөөд мессежийг IBM MQ дотор шилжүүлсэн.

Хэрэв алдаа түр зуурын байсан бөгөөд бид үүнийг тодорхойлж чадвал (жишээлбэл, HTTP дуудлага дээрх ResourceAccessException эсвэл MongoDb хүсэлт дээрх MongoTimeoutException) дахин оролдох стратеги хүчин төгөлдөр болно. Програмын салаалсан логикоос үл хамааран анхны мессежийг илгээх саатсан системийн дараалал руу эсвэл мессежийг дахин илгээхийн тулд аль эрт хийгдсэн тусдаа програм руу шилжүүлсэн. Үүнд саатлын интервал эсвэл програмын түвшний стратегийн төгсгөлд холбогдсон мессежийн толгой хэсэгт дахин илгээх дугаар орно. Хэрэв бид стратегийн төгсгөлд хүрсэн боловч гадаад систем боломжгүй хэвээр байгаа бол мессежийг гараар задлан шинжлэх зорилгоор DLQ-д байршуулах болно.

Шийдэл олох

Интернетээс хайж байна, та дараахийг олж болно решение. Товчхондоо, саатлын интервал бүрт сэдэв үүсгэж, шаардлагатай саатал бүхий мессежүүдийг унших тал дээр Хэрэглэгчийн програмуудыг хэрэгжүүлэхийг санал болгож байна.

Кафкагаас хүлээн авсан үйл явдлуудыг дахин боловсруулж байна

Олон тооны эерэг тоймыг үл харгалзан энэ нь бүрэн амжилтанд хүрээгүй юм шиг санагдаж байна. Юуны өмнө, хөгжүүлэгч нь бизнесийн шаардлагыг хэрэгжүүлэхээс гадна тайлбарласан механизмыг хэрэгжүүлэхэд маш их цаг зарцуулах шаардлагатай болно.

Нэмж дурдахад, хэрэв Кафка кластер дээр хандалтын хяналт идэвхжсэн бол та сэдвүүдийг үүсгэж, тэдгээрт шаардлагатай хандалтыг өгөхөд хэсэг хугацаа зарцуулах шаардлагатай болно. Нэмж дурдахад, та дахин оролдох сэдэв бүрийн хувьд зөв retention.ms параметрийг сонгох хэрэгтэй бөгөөд ингэснээр мессеж дахин илгээгдэх боломжтой бөгөөд үүнээс алга болохгүй. Хэрэгжилт болон хандалтын хүсэлтийг одоо байгаа эсвэл шинэ үйлчилгээ болгонд давтах шаардлагатай болно.

Мессежийг дахин боловсруулахад ерөнхийдөө, ялангуяа хавар-кафка ямар механизмууд бидэнд байгааг харцгаая. Spring-kafka нь Spring-retry-ээс шилжилтийн хамааралтай бөгөөд өөр өөр BackOffPolicies-ийг удирдахад хийсвэрлэл өгдөг. Энэ нь нэлээд уян хатан хэрэгсэл боловч түүний сул тал нь програмын санах ойд дахин илгээх зорилгоор мессежийг хадгалах явдал юм. Энэ нь шинэчлэлт эсвэл үйлдлийн алдааны улмаас програмыг дахин эхлүүлэх нь дахин боловсруулагдахыг хүлээж буй бүх мессежийг алдахад хүргэнэ гэсэн үг юм. Энэ цэг нь манай системийн хувьд нэн чухал тул бид цаашид авч үзээгүй.

Spring-kafka өөрөө ContainerAwareErrorHandler-ийн хэд хэдэн хэрэгжилтийг хангадаг SeekToCurrentErrorHandler, үүний тусламжтайгаар та алдаа гарсан тохиолдолд офсет шилжүүлэхгүйгээр дараа нь мессежийг боловсруулах боломжтой. Spring-kafka 2.3 хувилбараас эхлэн BackOffPolicy-г тохируулах боломжтой болсон.

Энэ арга нь дахин боловсруулсан мессежийг програмыг дахин эхлүүлэх үед амьд үлдэх боломжийг олгодог боловч DLQ механизм байхгүй хэвээр байна. Бид 2019 оны эхээр энэ сонголтыг сонгосон бөгөөд DLQ шаардлагагүй гэж өөдрөгөөр итгэж (бид азтай байсан бөгөөд програмыг ийм дахин боловсруулах системээр хэдэн сар ажиллуулсны дараа үнэндээ хэрэггүй байсан). Түр зуурын алдаа нь SeekToCurrentErrorHandler-г асаахад хүргэсэн. Үлдсэн алдаанууд нь бүртгэлд хэвлэгдэж, офсет гарч, дараагийн мессежээр боловсруулалт үргэлжилсэн.

Эцсийн шийдвэр

SeekToCurrentErrorHandler дээр суурилсан хэрэгжилт нь биднийг мессежийг дахин илгээх механизмаа хөгжүүлэхэд түлхэц болсон.

Юуны өмнө бид байгаа туршлагыг ашиглаж, хэрэглээний логикоос хамааран өргөжүүлэхийг хүссэн. Шугаман логик хэрэглээний хувьд дахин оролдох стратегид заасан богино хугацаанд шинэ мессежүүдийг уншихаа зогсоох нь оновчтой байх болно. Бусад хэрэглээний хувьд би дахин оролдох стратегийг хэрэгжүүлэх ганц цэгтэй байхыг хүссэн. Нэмж дурдахад энэ ганц цэг нь хоёр хандлагын хувьд DLQ функцтэй байх ёстой.

Дахин оролдох стратеги нь өөрөө програмд ​​хадгалагдах ёстой бөгөөд энэ нь түр зуурын алдаа гарах үед дараагийн интервалыг сэргээх үүрэгтэй.

Шугаман логик програмын хэрэглэгчийг зогсоох

Spring-kafka-тай ажиллах үед Хэрэглэгчийг зогсоох код дараах байдалтай байж болно.

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Жишээн дээр retryAt нь MessageListenerContainer ажиллаж байгаа бол дахин эхлүүлэх цаг юм. Дахин эхлүүлэх нь TaskScheduler дээр эхлүүлсэн тусдаа хэлхээнд хийгдэх бөгөөд хэрэгжилтийг хавар хангадаг.

Бид retryAt утгыг дараах байдлаар олно.

  1. Дахин дуудлагын тоолуурын утгыг хайж байна.
  2. Тоолуурын утга дээр үндэслэн дахин оролдох стратеги дахь одоогийн саатлын интервалыг хайдаг. Стратегийг програмд ​​өөрөө зарласан бөгөөд бид үүнийг хадгалахын тулд JSON форматыг сонгосон.
  3. JSON массиваас олдсон интервал нь дараа нь дахин боловсруулалт хийх шаардлагатай секундын тоог агуулна. RetryAt-ийн утгыг үүсгэхийн тулд энэ секундын тоог одоогийн цаг дээр нэмдэг.
  4. Хэрэв интервал олдохгүй бол retryAt-ийн утга хоосон байх ба гар аргаар задлан шинжлэх зорилгоор мессежийг DLQ руу илгээнэ.

Энэ аргын тусламжтайгаар одоо боловсруулж буй мессеж бүрийн давтагдсан дуудлагын тоог, жишээлбэл, програмын санах ойд хадгалахад л үлддэг. Шугаман логик програм нь боловсруулалтыг бүхэлд нь зохицуулах боломжгүй тул дахин оролдлогын тоог санах ойд хадгалах нь энэ аргын хувьд чухал биш юм. Хаврын дахин оролдлого хийхээс ялгаатай нь програмыг дахин эхлүүлэх нь бүх мессежийг дахин боловсруулахад хүргэхгүй, харин зүгээр л стратегийг дахин эхлүүлэх болно.

Энэ арга нь маш их ачааллын улмаас ажиллах боломжгүй байж болох гадаад системийн ачааллыг арилгахад тусалдаг. Өөрөөр хэлбэл, дахин боловсруулахаас гадна хэв маягийн хэрэгжилтэд хүрсэн таслагч.

Манай тохиолдолд алдааны босго нь зөвхөн 1 бөгөөд сүлжээний түр зуурын тасалдлаас болж системийн зогсолтыг багасгахын тулд бид хоцрогдлын интервал багатай маш нарийн ширхэгтэй дахин оролдох стратегийг ашигладаг. Энэ нь бүх бүлгийн програмуудад тохиромжгүй байж болох тул алдааны босго ба интервалын утгын хоорондын хамаарлыг системийн шинж чанарт үндэслэн сонгох ёстой.

Тодорхой бус логик бүхий програмуудын мессежийг боловсруулах тусдаа програм

Ийм програм руу мессеж илгээдэг кодын жишээг (Дахин оролдох) RETRY_AT цаг болоход DESTINATION сэдэв рүү дахин илгээх болно.


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Толгой хэсэгт маш их мэдээлэл дамждагийг жишээ харуулж байна. RETRY_AT-ийн утгыг Consumer stop-ээр дамжуулан дахин оролдох механизмтай ижил аргаар олно. DESTINATION болон RETRY_AT-аас гадна бид:

  • GROUP_ID, үүгээр бид гар аргаар дүн шинжилгээ хийх, хялбаршуулсан хайлт хийх зорилгоор мессежүүдийг бүлэглэнэ.
  • ORIGINAL_PARTITION нь ижил Хэрэглэгчийг дахин боловсруулахад байлгахыг оролдох. Энэ параметр нь null байж болох бөгөөд энэ тохиолдолд шинэ хуваалтыг эх мессежийн record.key() товчлуурыг ашиглан авна.
  • Дахин оролдох стратегийг дагахын тулд COUNTER утгыг шинэчилсэн.
  • SEND_TO нь RETRY_AT-д хүрэх үед мессежийг дахин боловсруулахаар илгээсэн эсвэл DLQ-д байрлуулсан эсэхийг илтгэх тогтмол юм.
  • ШАЛТГААН - мессеж боловсруулах тасалдсан шалтгаан.

Retryer нь PostgreSQL-д дахин илгээх, гараар задлан шинжлэх зорилгоор мессежүүдийг хадгалдаг. Цаг хэмжигч нь RETRY_AT ашиглан мессежийг хайж олох ажлыг эхлүүлж, тэдгээрийг Record.key() түлхүүрээр DESTINATION сэдвийн ORIGINAL_PARTITION хэсэг рүү буцааж илгээдэг.

Илгээсний дараа мессежүүд PostgreSQL-ээс устгагдана. Мессежийг гараар задлан шинжлэх нь REST API-ээр дамжуулан Retryer-тэй харьцдаг энгийн UI-д тохиолддог. Үүний гол онцлог нь DLQ-аас мессежийг дахин илгээх, устгах, алдааны мэдээллийг үзэх, мессеж хайх, жишээ нь алдааны нэрээр хайх явдал юм.

Манай кластерууд дээр хандалтын хяналт идэвхжсэн тул Retryer-ийн сонсож буй сэдэв рүү нэвтрэх хүсэлтийг нэмж өгөх шаардлагатай бөгөөд Retryer-д DESTINATION сэдэв рүү бичихийг зөвшөөрөх шаардлагатай. Энэ нь тохиромжгүй, гэхдээ интервалын сэдвийн арга барилаас ялгаатай нь бид үүнийг удирдах бүрэн хэмжээний DLQ болон UI-тай.

Ирж буй сэдвийг хэрэглээний хэд хэдэн бүлгүүд уншиж, өөр логикийг хэрэгжүүлдэг тохиолдол байдаг. Эдгээр аппликешнүүдийн аль нэгийг нь дахин оролдохоор дамжуулан мессежийг дахин боловсруулснаар нөгөө нь давхардсан болно. Үүнээс хамгаалахын тулд бид дахин боловсруулах тусдаа сэдвийг бий болгодог. Ирж буй болон дахин оролдох сэдвүүдийг нэг хэрэглэгч ямар ч хязгаарлалтгүйгээр унших боломжтой.

Кафкагаас хүлээн авсан үйл явдлуудыг дахин боловсруулж байна

Анхдагч байдлаар энэ арга нь таслагчийн функцийг хангадаггүй ч үүнийг ашиглан програмд ​​нэмж болно хавар-үүл-нетфликс эсвэл шинэ хаврын үүл таслагч, гадны үйлчилгээ дуудагдсан газруудыг зохих хийсвэрлэлд боож. Үүнээс гадна стратеги сонгох боломжтой болно задгай толгой загвар, энэ нь бас ашигтай байж болно. Жишээлбэл, spring-cloud-netflix-д энэ нь thread-ийн сан эсвэл семафор байж болно.

дүгнэлт

Үүний үр дүнд бид ямар нэгэн гадны систем түр ажиллахгүй бол мессежийн боловсруулалтыг давтах боломжийг олгодог тусдаа програмтай болсон.

Програмын гол давуу талуудын нэг нь үүнийг ижил Кафка кластер дээр ажилладаг гадаад системүүд, тэдгээрийн талд мэдэгдэхүйц өөрчлөлтгүйгээр ашиглах боломжтой юм! Ийм аппликейшн нь зөвхөн дахин оролдох сэдэв рүү нэвтэрч, хэдэн Кафкагийн толгой хэсгийг бөглөж, Retryer рүү мессеж илгээхэд л хангалттай. Нэмэлт дэд бүтцийг босгох шаардлагагүй. Мөн програмаас Retryer руу болон буцах мессежийн тоог багасгахын тулд бид шугаман логик бүхий програмуудыг тодорхойлж, Хэрэглэгчийн зогсоолоор дамжуулан дахин боловсруулав.

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх