Redis Stream - Ձեր հաղորդագրությունների համակարգերի հուսալիություն և մասշտաբայնություն

Redis Stream - Ձեր հաղորդագրությունների համակարգերի հուսալիություն և մասշտաբայնություն

Redis Stream-ը նոր վերացական տվյալների տեսակ է, որը ներկայացվել է Redis-ում 5.0 տարբերակով
Հայեցակարգային առումով, Redis Stream-ը ցուցակ է, որին կարող եք ավելացնել գրառումներ: Յուրաքանչյուր մուտք ունի եզակի նույնացուցիչ: Լռելյայնորեն, ID-ն ավտոմատ կերպով ստեղծվում է և ներառում է ժամանակի դրոշմ: Հետևաբար, դուք կարող եք ժամանակի ընթացքում հարցումներ կատարել գրառումների միջակայքերում կամ ստանալ նոր տվյալներ հոսքի մեջ հայտնվելուն պես, ինչպես Unix-ի «tail -f» հրամանը կարդում է log ֆայլը և սառեցնում նոր տվյալներին սպասելիս: Նկատի ունեցեք, որ մի քանի հաճախորդներ կարող են միաժամանակ լսել շարանը, ճիշտ նույնքան «tail -f» պրոցեսներ կարող են միաժամանակ կարդալ ֆայլը՝ առանց միմյանց հետ հակասելու:

Տվյալների նոր տեսակի բոլոր առավելությունները հասկանալու համար եկեք արագ նայենք երկար ժամանակ գոյություն ունեցող Redis կառույցներին, որոնք մասամբ կրկնում են Redis Stream-ի ֆունկցիոնալությունը:

Redis PUB/SUB

Redis Pub/Sub-ը պարզ հաղորդագրությունների համակարգ է, որն արդեն ներկառուցված է ձեր հիմնական արժեքի խանութում: Այնուամենայնիվ, պարզությունը գալիս է իր գնով.

  • Եթե ​​հրատարակիչը ինչ-ինչ պատճառներով ձախողվում է, ուրեմն նա կորցնում է իր բոլոր բաժանորդներին
  • Հրատարակիչը պետք է իմանա իր բոլոր բաժանորդների ճշգրիտ հասցեն
  • Հրատարակիչը կարող է ծանրաբեռնել իր բաժանորդներին աշխատանքով, եթե տվյալներն ավելի արագ են հրապարակվում, քան դրանք մշակվում են
  • Հաղորդագրությունը հրապարակումից անմիջապես հետո ջնջվում է հրատարակչի բուֆերից՝ անկախ նրանից, թե քանի բաժանորդի է այն առաքվել և որքան արագ են նրանք կարողացել մշակել այս հաղորդագրությունը:
  • Բոլոր բաժանորդները կստանան հաղորդագրությունը միաժամանակ: Բաժանորդներն իրենք պետք է ինչ-որ կերպ պայմանավորվեն իրենց միջև նույն հաղորդագրության մշակման կարգի վերաբերյալ:
  • Չկա ներկառուցված մեխանիզմ, որը հաստատում է, որ բաժանորդը հաջողությամբ մշակել է հաղորդագրությունը: Եթե ​​բաժանորդը ստանում է հաղորդագրություն և խափանում է մշակման ընթացքում, հրատարակիչը չի իմանա այդ մասին:

Redis List

Redis List-ը տվյալների կառուցվածք է, որն աջակցում է ընթերցման հրամանների արգելափակմանը: Դուք կարող եք ավելացնել և կարդալ հաղորդագրություններ ցանկի սկզբից կամ վերջից: Այս կառուցվածքի հիման վրա դուք կարող եք լավ բուրգ կամ հերթ ստեղծել ձեր բաշխված համակարգի համար, և շատ դեպքերում դա բավարար կլինի: Հիմնական տարբերությունները Redis Pub/Sub-ից.

  • Հաղորդագրությունը առաքվում է մեկ հաճախորդի: Առաջին ընթերցմամբ արգելափակված հաճախորդը նախ կստանա տվյալները:
  • Քլինթն ինքը պետք է սկսի յուրաքանչյուր հաղորդագրության ընթերցման գործողությունը: List-ը ոչինչ չգիտի հաճախորդների մասին:
  • Հաղորդագրությունները պահվում են այնքան ժամանակ, մինչև ինչ-որ մեկը կարդա դրանք կամ բացահայտորեն չջնջի դրանք: Եթե ​​դուք կարգավորեք Redis սերվերը, որպեսզի տվյալները լցվի սկավառակի վրա, ապա համակարգի հուսալիությունը կտրուկ աճում է:

Ներածություն Stream-ին

Հոսքի մեջ մուտքի ավելացում

Թիմ XADD ավելացնում է նոր մուտք դեպի հոսք: Գրառումը պարզապես տող չէ, այն բաղկացած է մեկ կամ մի քանի բանալի-արժեք զույգերից: Այսպիսով, յուրաքանչյուր գրառում արդեն կառուցված է և նման է CSV ֆայլի կառուցվածքին:

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Վերևի օրինակում հոսքին ավելացնում ենք երկու դաշտ՝ «mystream» անունով (բանալի)՝ «sensor-id» և «ջերմաստիճան»՝ համապատասխանաբար «1234» և «19.8» արժեքներով: Որպես երկրորդ արգումենտ, հրամանը վերցնում է նույնացուցիչ, որը վերագրվելու է մուտքին. այս նույնացուցիչը եզակիորեն նույնացնում է հոսքի յուրաքանչյուր մուտքը: Այնուամենայնիվ, այս դեպքում մենք անցանք *, քանի որ ցանկանում ենք, որ Redis-ը մեզ համար նոր ID գեներացնի: Յուրաքանչյուր նոր ID-ն կավելանա: Հետևաբար, յուրաքանչյուր նոր մուտք կունենա ավելի բարձր նույնացուցիչ՝ նախորդ գրառումների համեմատությամբ:

Նույնացուցիչի ձևաչափը

Հրամանով վերադարձված մուտքի ID-ն XADD, բաղկացած է երկու մասից.

{millisecondsTime}-{sequenceNumber}

millisecondsTime — Unix-ի ժամանակը միլիվայրկյաններով (Redis սերվերի ժամանակը): Այնուամենայնիվ, եթե ընթացիկ ժամը նույնն է կամ պակաս է նախորդ ձայնագրության ժամանակից, ապա օգտագործվում է նախորդ ձայնագրության ժամանակի դրոշմը: Հետևաբար, եթե սերվերի ժամանակը վերադառնում է ժամանակի հետ, նոր նույնացուցիչը դեռևս կպահպանի ավելացման հատկությունը:

հաջորդականությունՀամար օգտագործվում է նույն միլիվայրկյանում ստեղծված գրառումների համար: հաջորդականությունՀամար կավելացվի 1-ով նախորդ մուտքի համեմատ: Քանի որ հաջորդականությունՀամար ունի 64 բիթ չափ, այնուհետև գործնականում չպետք է սահմանափակվի գրառումների քանակի սահմանափակում, որը կարող է ստեղծվել մեկ միլիվայրկյանում:

Նման նույնացուցիչների ձևաչափն առաջին հայացքից կարող է տարօրինակ թվալ: Անվստահ ընթերցողը կարող է մտածել, թե ինչու է ժամանակը նույնացուցիչի մի մասը: Պատճառն այն է, որ Redis հոսքերը աջակցում են միջակայքի հարցումները ID-ով: Քանի որ նույնացուցիչը կապված է գրառումի ստեղծման ժամանակի հետ, դա հնարավորություն է տալիս հարցումներ կատարել ժամանակային միջակայքերում: Մենք կանդրադառնանք կոնկրետ օրինակին, երբ նայենք հրամանին XRANGE.

Եթե ​​ինչ-ինչ պատճառներով օգտատերը պետք է նշի իր սեփական նույնացուցիչը, որն, օրինակ, կապված է ինչ-որ արտաքին համակարգի հետ, ապա մենք կարող ենք այն փոխանցել հրամանին. XADD *-ի փոխարեն, ինչպես ցույց է տրված ստորև.

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Խնդրում ենք նկատի ունենալ, որ այս դեպքում դուք պետք է ինքներդ վերահսկեք ID-ի ավելացումը: Մեր օրինակում նվազագույն նույնացուցիչը «0-1» է, ուստի հրամանը չի ընդունի մեկ այլ նույնացուցիչ, որը հավասար է կամ փոքր է «0-1»-ին:

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Մեկ հոսքի համար գրառումների քանակը

Հոսքի մեջ գրառումների քանակը հնարավոր է ստանալ պարզապես հրամանի միջոցով XLEN. Մեր օրինակի համար այս հրամանը կվերադարձնի հետևյալ արժեքը.

> XLEN somestream
(integer) 2

Շրջանակի հարցումներ - XRANGE և XREVRANGE

Տվյալներ ըստ տիրույթի պահանջելու համար մենք պետք է նշենք երկու նույնացուցիչ՝ միջակայքի սկիզբը և վերջը: Վերադարձված միջակայքը կներառի բոլոր տարրերը, ներառյալ սահմանները: Կան նաև երկու հատուկ նույնացուցիչներ «-» և «+», որոնք համապատասխանաբար նշանակում են հոսքի ամենափոքր (առաջին գրառումը) և ամենամեծ (վերջին գրառումը) նույնացուցիչը: Ստորև բերված օրինակը ցույց կտա հոսքի բոլոր գրառումները:

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Յուրաքանչյուր վերադարձված գրառում իրենից ներկայացնում է երկու տարրերից բաղկացած զանգված՝ նույնացուցիչ և բանալի-արժեք զույգերի ցանկ: Մենք արդեն ասացինք, որ գրառումների նույնացուցիչները կապված են ժամանակի հետ: Հետևաբար, մենք կարող ենք պահանջել որոշակի ժամանակահատվածի շրջանակ: Այնուամենայնիվ, մենք կարող ենք հարցման մեջ նշել ոչ թե ամբողջական նույնացուցիչը, այլ միայն Unix ժամանակը, բաց թողնելով դրա հետ կապված մասը. հաջորդականությունՀամար. Նույնացուցիչի բաց թողնված մասը ինքնաբերաբար կսահմանվի զրոյի տիրույթի սկզբում և առավելագույն հնարավոր արժեքի միջակայքի վերջում: Ստորև բերված է օրինակ, թե ինչպես կարող եք պահանջել երկու միլիվայրկյան տիրույթ:

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Մենք ունենք միայն մեկ մուտք այս տիրույթում, սակայն իրական տվյալների հավաքածուներում վերադարձված արդյունքը կարող է հսկայական լինել: Այս պատճառով XRANGE աջակցում է COUNT տարբերակը: Նշելով քանակությունը, մենք կարող ենք պարզապես ստանալ առաջին N գրառումները: Եթե ​​մեզ անհրաժեշտ լինի ստանալ հաջորդ N գրառումները (էջադրում), ապա կարող ենք օգտագործել վերջին ստացված ID-ն, ավելացնել այն հաջորդականությունՀամար մեկով և նորից հարցրեք. Դիտարկենք սա հետևյալ օրինակում։ Մենք սկսում ենք ավելացնել 10 տարր հետ XADD (ենթադրելով, որ mystream-ն արդեն լցված է 10 տարրով): Մեկ հրամանի համար 2 տարր ստանալով կրկնությունը սկսելու համար մենք սկսում ենք ամբողջ տիրույթից, բայց COUNT-ով, որը հավասար է 2-ի:

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Հաջորդ երկու տարրերի հետ կրկնությունը շարունակելու համար մենք պետք է ընտրենք ստացված վերջին ID-ն, այսինքն՝ 1519073279157-0, և ավելացնենք 1-ը: հաջորդականությունՀամար.
Ստացված ID-ն, այս դեպքում՝ 1519073279157-1, այժմ կարող է օգտագործվել որպես ընդգրկույթի նոր մեկնարկի արգումենտ հաջորդ զանգի համար։ XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Եվ այսպես շարունակ։ Քանի որ բարդությունը XRANGE Որոնման համար O(log(N)) է, իսկ այնուհետև O(M)՝ M տարրերը վերադարձնելու համար, ապա յուրաքանչյուր կրկնվող քայլ արագ է: Այսպիսով, օգտագործելով XRANGE հոսքերը կարող են արդյունավետ կերպով կրկնվել:

Թիմ XREVRANGE համարժեք է XRANGE, բայց տարրերը վերադարձնում է հակառակ հերթականությամբ.

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Խնդրում ենք նկատի ունենալ, որ հրամանը XREVRANGE վերցնում է միջակայքի արգումենտները սկսել և դադարեցնել հակառակ հերթականությամբ:

Նոր գրառումների ընթերցում XREAD-ի միջոցով

Հաճախ խնդիր է առաջանում բաժանորդագրվել հոսքին և ստանալ միայն նոր հաղորդագրություններ: Այս հայեցակարգը կարող է նման լինել Redis Pub/Sub-ին կամ արգելափակել Redis List-ը, սակայն կան հիմնարար տարբերություններ Redis Stream-ի օգտագործման մեջ.

  1. Յուրաքանչյուր նոր հաղորդագրություն լռելյայն հանձնվում է յուրաքանչյուր բաժանորդի: Այս պահվածքը տարբերվում է արգելափակող Redis List-ից, որտեղ նոր հաղորդագրությունը կկարդա միայն մեկ բաժանորդ:
  2. Մինչ Redis Pub/Sub-ում բոլոր հաղորդագրությունները մոռացվում են և երբեք չեն պահպանվում, Stream-ում բոլոր հաղորդագրությունները պահվում են անորոշ ժամանակով (եթե հաճախորդը բացահայտորեն ջնջում է):
  3. Redis Stream-ը թույլ է տալիս տարբերակել հաղորդագրությունների հասանելիությունը մեկ հոսքի ընթացքում: Հատուկ բաժանորդը կարող է տեսնել միայն իր անձնական հաղորդագրությունների պատմությունը:

Դուք կարող եք բաժանորդագրվել թեմայի և ստանալ նոր հաղորդագրություններ՝ օգտագործելով հրամանը XREAD. Դա մի փոքր ավելի բարդ է, քան XRANGE, ուստի նախ կսկսենք ավելի պարզ օրինակներով:

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Վերևի օրինակը ցույց է տալիս չարգելափակող ձև XREAD. Նկատի ունեցեք, որ COUNT տարբերակը պարտադիր չէ: Փաստորեն, հրամանի միակ պահանջվող տարբերակը STREAMS տարբերակն է, որը սահմանում է հոսքերի ցանկը համապատասխան առավելագույն նույնացուցիչի հետ միասին: Մենք գրել ենք «STREAMS mystream 0» - մենք ցանկանում ենք ստանալ mystream հոսքի բոլոր գրառումները «0-0»-ից մեծ նույնացուցիչով: Ինչպես տեսնում եք օրինակից, հրամանը վերադարձնում է շարանի անունը, քանի որ մենք կարող ենք միաժամանակ բաժանորդագրվել մի քանի թելերի: Մենք կարող ենք գրել, օրինակ, «STREAMS mystream otherstream 0 0»: Խնդրում ենք նկատի ունենալ, որ STREAMS տարբերակից հետո մենք պետք է նախ տրամադրենք բոլոր անհրաժեշտ հոսքերի անունները և միայն այնուհետև նույնացուցիչների ցանկը:

Այս պարզ ձևով հրամանը որևէ առանձնահատուկ բան չի անում համեմատած XRANGE. Այնուամենայնիվ, հետաքրքիրն այն է, որ մենք հեշտությամբ կարող ենք շրջվել XREAD արգելափակող հրամանին՝ նշելով BLOCK արգումենտը.

> XREAD BLOCK 0 STREAMS mystream $

Վերևի օրինակում նոր BLOCK տարբերակ է նշվում 0 միլիվայրկյան ժամկետով (սա նշանակում է անժամկետ սպասել): Ավելին, mystream հոսքի սովորական նույնացուցիչը փոխանցելու փոխարեն փոխանցվել է $ հատուկ նույնացուցիչ: Այս հատուկ նույնացուցիչը նշանակում է, որ XREAD որպես նույնացուցիչ պետք է օգտագործի mystream-ի առավելագույն նույնացուցիչը: Այսպիսով, մենք նոր հաղորդագրություններ կստանանք միայն այն պահից, երբ սկսել ենք լսել: Որոշ առումներով սա նման է Unix «tail -f» հրամանին:

Նկատի ունեցեք, որ BLOCK տարբերակն օգտագործելիս պարտադիր չէ, որ օգտագործենք $ հատուկ նույնացուցիչը: Մենք կարող ենք օգտագործել հոսքում առկա ցանկացած նույնացուցիչ: Եթե ​​թիմը կարող է անմիջապես սպասարկել մեր հարցումը՝ առանց արգելափակման, ապա դա կանի, հակառակ դեպքում՝ կարգելափակվի։

Արգելափակում XREAD կարող է նաև միանգամից մի քանի թեմաներ լսել, պարզապես անհրաժեշտ է նշել դրանց անունները: Այս դեպքում հրամանը կվերադարձնի տվյալներ ստացած առաջին հոսքի գրառումը: Տվյալ շղթայի համար արգելափակված առաջին բաժանորդը նախ տվյալներ կստանա:

Սպառողների խմբեր

Որոշ առաջադրանքներում մենք ցանկանում ենք սահմանափակել բաժանորդների մուտքը հաղորդագրություններ մեկ թեմայի շրջանակներում: Օրինակ, որտեղ դա կարող է օգտակար լինել, աշխատողների հետ հաղորդագրությունների հերթն է, որը կստանա տարբեր հաղորդագրություններ շղթայից՝ թույլ տալով հաղորդագրությունների մշակումը մասշտաբավորել:

Եթե ​​պատկերացնենք, որ մենք ունենք երեք բաժանորդ C1, C2, C3 և մի թեմա, որը պարունակում է հաղորդագրություններ 1, 2, 3, 4, 5, 6, 7, ապա հաղորդագրությունները կմատուցվեն ստորև ներկայացված գծապատկերում.

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Այս էֆեկտին հասնելու համար Redis Stream-ը օգտագործում է սպառողների խումբ կոչվող հայեցակարգ: Այս հայեցակարգը նման է կեղծ բաժանորդին, որը տվյալներ է ստանում հոսքից, բայց իրականում սպասարկվում է մի քանի բաժանորդների կողմից խմբի ներսում՝ տրամադրելով որոշակի երաշխիքներ.

  1. Յուրաքանչյուր հաղորդագրություն ուղարկվում է խմբի ներսում գտնվող մեկ այլ բաժանորդի:
  2. Խմբի ներսում բաժանորդները նույնացվում են իրենց անունով, որը մեծատառերի զգայուն տող է: Եթե ​​բաժանորդը ժամանակավորապես դուրս է գալիս խմբից, նա կարող է վերականգնվել խմբի մեջ՝ օգտագործելով իր յուրահատուկ անունը:
  3. Յուրաքանչյուր սպառողների խումբ հետևում է «առաջին չկարդացված հաղորդագրություն» հայեցակարգին: Երբ բաժանորդը պահանջում է նոր հաղորդագրություններ, նա կարող է ստանալ միայն հաղորդագրություններ, որոնք նախկինում երբևէ չեն առաքվել խմբի ներսում գտնվող որևէ բաժանորդի:
  4. Կա հրաման՝ հստակորեն հաստատելու, որ հաղորդագրությունը հաջողությամբ մշակվել է բաժանորդի կողմից: Քանի դեռ այս հրամանը չի կանչվել, պահանջվող հաղորդագրությունը կմնա «սպասող» կարգավիճակում։
  5. Սպառողների խմբի շրջանակներում յուրաքանչյուր բաժանորդ կարող է պահանջել իրեն ուղարկված, բայց դեռևս չմշակված հաղորդագրությունների պատմություն («սպասող» կարգավիճակում)

Ինչ-որ իմաստով խմբի վիճակը կարող է արտահայտվել հետևյալ կերպ.

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Այժմ ժամանակն է ծանոթանալ Սպառողների խմբի հիմնական հրամաններին, մասնավորապես.

  • XGROUP օգտագործվում է խմբեր ստեղծելու, ոչնչացնելու և կառավարելու համար
  • XREADGROUP օգտագործվում է հոսքը խմբի միջոցով կարդալու համար
  • XACK - այս հրամանը բաժանորդին թույլ է տալիս նշել հաղորդագրությունը որպես հաջող մշակված

Սպառողների խմբի ստեղծում

Ենթադրենք, որ mystream արդեն գոյություն ունի։ Այնուհետև խմբի ստեղծման հրամանը նման կլինի.

> XGROUP CREATE mystream mygroup $
OK

Խումբ ստեղծելիս պետք է նույնացուցիչ փոխանցենք, որից սկսած խումբը կստանա հաղորդագրություններ։ Եթե ​​մենք պարզապես ցանկանում ենք ստանալ բոլոր նոր հաղորդագրությունները, ապա մենք կարող ենք օգտագործել $ հատուկ նույնացուցիչը (ինչպես վերը նշված մեր օրինակում): Եթե ​​հատուկ նույնացուցիչի փոխարեն նշեք 0, ապա շղթայի բոլոր հաղորդագրությունները հասանելի կլինեն խմբին:

Այժմ, երբ խումբը ստեղծվել է, մենք կարող ենք անմիջապես սկսել հաղորդագրությունները կարդալ հրամանի միջոցով XREADGROUP. Այս հրամանը շատ նման է XREAD և աջակցում է կամընտիր BLOCK տարբերակը: Այնուամենայնիվ, կա պահանջվող GROUP տարբերակ, որը միշտ պետք է նշվի երկու արգումենտով՝ խմբի անվանումը և բաժանորդի անունը: Աջակցվում է նաև COUNT տարբերակը:

Նախքան թեման կարդալը, եկեք այնտեղ տեղադրենք մի քանի հաղորդագրություն.

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Այժմ փորձենք կարդալ այս հոսքը խմբի միջոցով.

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Վերոնշյալ հրամանը բառացիորեն կարդում է հետևյալ կերպ.

«Ես՝ բաժանորդ Ալիսս, mygroup-ի անդամ, ցանկանում եմ կարդալ mystream-ից մեկ հաղորդագրություն, որը նախկինում երբեք որևէ մեկին չի հանձնվել»:

Ամեն անգամ, երբ բաժանորդը որևէ գործողություն է կատարում խմբի վրա, նա պետք է նշի իր անունը՝ եզակի կերպով նույնացնելով իրեն խմբի ներսում: Վերոնշյալ հրամանում կա ևս մեկ շատ կարևոր մանրամասն՝ «>» հատուկ նույնացուցիչը։ Այս հատուկ նույնացուցիչը զտում է հաղորդագրությունները՝ թողնելով միայն նրանք, որոնք նախկինում երբեք չեն առաքվել:

Բացի այդ, հատուկ դեպքերում կարող եք նշել իրական նույնացուցիչ, ինչպիսին է 0-ը կամ որևէ այլ վավեր նույնացուցիչ: Այս դեպքում հրամանը XREADGROUP Ձեզ կվերադարձնի «սպասող» կարգավիճակով հաղորդագրությունների պատմություն, որոնք առաքվել են նշված բաժանորդին (Ալիս), բայց դեռ չեն հաստատվել հրամանի միջոցով: XACK.

Մենք կարող ենք ստուգել այս վարքագիծը՝ անմիջապես նշելով ID 0, առանց տարբերակի COUNT. Մենք պարզապես կտեսնենք մեկ սպասող հաղորդագրություն, այսինքն՝ Apple-ի հաղորդագրությունը.

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Այնուամենայնիվ, եթե մենք հաստատենք, որ հաղորդագրությունը հաջողությամբ մշակվել է, ապա այն այլևս չի ցուցադրվի.

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Հիմա Բոբի հերթն է ինչ-որ բան կարդալու.

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Mygroup-ի անդամ Բոբը խնդրեց ոչ ավելի, քան երկու հաղորդագրություն: Հրահանգը հաղորդում է միայն չհանձնված հաղորդագրությունների մասին «>» հատուկ նույնացուցիչի շնորհիվ: Ինչպես տեսնում եք, «խնձոր» հաղորդագրությունը չի ցուցադրվի, քանի որ այն արդեն առաքվել է Ալիսին, ուստի Բոբը ստանում է «նարնջագույն» և «ելակ»:

Այս կերպ Ալիսը, Բոբը և խմբի ցանկացած այլ բաժանորդ կարող են կարդալ տարբեր հաղորդագրություններ նույն հոսքից: Նրանք կարող են նաև կարդալ չմշակված հաղորդագրությունների իրենց պատմությունը կամ նշել հաղորդագրությունները որպես մշակված:

Մի քանի բան պետք է հիշել.

  • Հենց որ բաժանորդը հաղորդագրությունը համարի հրաման XREADGROUP, այս հաղորդագրությունը անցնում է «սպասող» վիճակի և նշանակվում է տվյալ բաժանորդին: Խմբի մյուս բաժանորդները չեն կարողանա կարդալ այս հաղորդագրությունը:
  • Բաժանորդները ավտոմատ կերպով ստեղծվում են առաջին իսկ հիշատակումից հետո, դրանք հստակ ստեղծելու կարիք չկա:
  • Հետ XREADGROUP դուք կարող եք միաժամանակ կարդալ բազմաթիվ տարբեր թեմաներից հաղորդագրություններ, սակայն դա աշխատելու համար նախ անհրաժեշտ է ստեղծել նույն անունով խմբեր յուրաքանչյուր թեմայի համար՝ օգտագործելով XGROUP

Վերականգնում ձախողումից հետո

Բաժանորդը կարող է վերականգնվել ձախողումից և վերընթերցել իր հաղորդագրությունների ցանկը «սպասող» կարգավիճակով: Այնուամենայնիվ, իրական աշխարհում բաժանորդները կարող են ի վերջո ձախողվել: Ի՞նչ է պատահում բաժանորդի խրված հաղորդագրությունների հետ, եթե բաժանորդը չի կարողանում վերականգնվել ձախողումից հետո:
Սպառողների խումբն առաջարկում է մի գործառույթ, որն օգտագործվում է հենց նման դեպքերում՝ երբ պետք է փոխել հաղորդագրությունների սեփականատիրոջը:

Առաջին բանը, որ դուք պետք է անեք, զանգահարեք հրամանը ԱՊԱՀՈՎ, որը ցուցադրում է խմբի բոլոր հաղորդագրությունները «սպասում» կարգավիճակով: Իր ամենապարզ ձևով հրամանը կանչվում է միայն երկու արգումենտով՝ թեմայի անվանումը և խմբի անվանումը.

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Թիմը ցուցադրեց չմշակված հաղորդագրությունների քանակը ամբողջ խմբի և յուրաքանչյուր բաժանորդի համար: Մենք ունենք միայն Բոբը երկու չմարված հաղորդագրություններով, քանի որ Ալիսի պահանջած միակ հաղորդագրությունը հաստատվել է XACK.

Մենք կարող ենք լրացուցիչ տեղեկություններ պահանջել՝ օգտագործելով ավելի շատ փաստարկներ.

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - նույնացուցիչների տիրույթ (կարող եք օգտագործել «-» և «+»)
{count} — առաքման փորձերի քանակը
{consumer-name} - խմբի անուն

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Այժմ մենք ունենք մանրամասներ յուրաքանչյուր հաղորդագրության համար՝ ID, բաժանորդի անունը, անգործության ժամանակը միլիվայրկյաններով և վերջապես առաքման փորձերի քանակը: Մենք ունենք երկու հաղորդագրություն Բոբից, և նրանք անգործության են մատնվել 74170458 միլիվայրկյան, մոտ 20 ժամ:

Խնդրում ենք նկատի ունենալ, որ ոչ ոք մեզ չի խանգարում ստուգել, ​​թե որն է հաղորդագրության բովանդակությունը պարզապես օգտագործելով XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Պարզապես պետք է կրկնել նույն նույնացուցիչը երկու անգամ արգումենտներում: Այժմ, երբ մենք որոշակի պատկերացում ունենք, Ալիսը կարող է որոշել, որ 20 ժամ դադարից հետո Բոբը, հավանաբար, չի վերականգնվի, և ժամանակն է հարցնել այդ հաղորդագրությունները և վերսկսել դրանք Բոբի մշակումը: Դրա համար մենք օգտագործում ենք հրամանը XՊԱՅՑՈՒՄ:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Օգտագործելով այս հրամանը, մենք կարող ենք ստանալ «օտար» հաղորդագրություն, որը դեռ չի մշակվել՝ փոխելով սեփականատերը {consumer}: Այնուամենայնիվ, մենք կարող ենք նաև տրամադրել նվազագույն անգործության ժամանակ {min-idle-time}: Սա օգնում է խուսափել մի իրավիճակից, երբ երկու հաճախորդներ փորձում են միաժամանակ փոխել նույն հաղորդագրությունների սեփականատիրոջը.

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Առաջին հաճախորդը կվերակայի անգործության ժամանակը և կավելացնի առաքման հաշվիչը: Այսպիսով, երկրորդ հաճախորդը չի կարողանա պահանջել այն:

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Հաղորդագրությունը հաջողությամբ ներկայացվել է Ալիսի կողմից, որն այժմ կարող է մշակել հաղորդագրությունը և հաստատել այն:

Վերոնշյալ օրինակից կարող եք տեսնել, որ հաջողված հարցումը վերադարձնում է հենց հաղորդագրության բովանդակությունը: Այնուամենայնիվ, դա անհրաժեշտ չէ: JUSTID տարբերակը կարող է օգտագործվել միայն հաղորդագրությունների նույնացուցիչները վերադարձնելու համար: Սա օգտակար է, եթե ձեզ չեն հետաքրքրում հաղորդագրության մանրամասները և ցանկանում եք բարձրացնել համակարգի աշխատանքը:

Առաքման հաշվիչ

Հաշվիչը, որը դուք տեսնում եք ելքում ԱՊԱՀՈՎ յուրաքանչյուր հաղորդագրության առաքումների թիվն է: Նման հաշվիչը ավելացվում է երկու եղանակով. երբ հաղորդագրությունը հաջողությամբ պահանջվում է միջոցով XՊԱՅՑՈՒՄ կամ երբ օգտագործվում է զանգ XREADGROUP.

Որոշ հաղորդագրությունների մի քանի անգամ առաքումը նորմալ է: Հիմնական բանն այն է, որ բոլոր հաղորդագրությունները ի վերջո մշակվեն: Երբեմն հաղորդագրություն մշակելիս խնդիրներ են առաջանում, քանի որ հաղորդագրությունն ինքնին վնասված է, կամ հաղորդագրության մշակումը սխալ է առաջացնում մշակողի կոդը: Այս դեպքում կարող է պարզվել, որ ոչ ոք չի կարողանա մշակել այս հաղորդագրությունը։ Քանի որ մենք ունենք առաքման փորձի հաշվիչ, մենք կարող ենք օգտագործել այս հաշվիչը՝ նման իրավիճակները հայտնաբերելու համար: Հետևաբար, երբ առաքումների քանակը հասնի ձեր նշած բարձր թվին, հավանաբար ավելի խելամիտ կլինի նման հաղորդագրություն տեղադրել մեկ այլ թեմայում և ծանուցում ուղարկել համակարգի ադմինիստրատորին:

Թեմայի վիճակը

Թիմ XINFO օգտագործվում է թեմայի և դրա խմբերի մասին տարբեր տեղեկություններ պահանջելու համար: Օրինակ, հիմնական հրամանն ունի հետևյալ տեսքը.

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Վերևի հրամանը ցուցադրում է ընդհանուր տեղեկություններ նշված հոսքի մասին: Հիմա մի փոքր ավելի բարդ օրինակ.

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Վերևի հրամանը ցուցադրում է ընդհանուր տեղեկություններ նշված թեմայի բոլոր խմբերի համար

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Վերևի հրամանը ցուցադրում է տեղեկատվություն նշված հոսքի և խմբի բոլոր բաժանորդների համար:
Եթե ​​մոռացել եք հրամանի շարահյուսությունը, պարզապես օգնություն խնդրեք հենց հրամանից.

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Հոսքի չափի սահմանաչափ

Շատ հավելվածներ չեն ցանկանում հավերժ հավաքել տվյալներ հոսքի մեջ: Հաճախ օգտակար է յուրաքանչյուր շղթայի համար թույլատրելի առավելագույն թվով հաղորդագրություններ ունենալ: Այլ դեպքերում, օգտակար է բոլոր հաղորդագրությունները շղթայից տեղափոխել մեկ այլ մշտական ​​պահեստ, երբ հասնի շղթայի նշված չափը: Դուք կարող եք սահմանափակել հոսքի չափը՝ օգտագործելով հրամանի MAXLEN պարամետրը XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

MAXLEN-ն օգտագործելիս հին գրառումներն ավտոմատ կերպով ջնջվում են, երբ հասնում են որոշակի երկարության, ուստի հոսքն ունի մշտական ​​չափ: Այնուամենայնիվ, այս դեպքում էտումը տեղի չի ունենում Redis-ի հիշողության մեջ ամենաարդյունավետ ձևով: Դուք կարող եք բարելավել իրավիճակը հետևյալ կերպ.

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Վերոհիշյալ օրինակի ~ փաստարկը նշանակում է, որ մենք պարտադիր չէ, որ հոսքի երկարությունը սահմանափակենք որոշակի արժեքով: Մեր օրինակում սա կարող է լինել 1000-ից մեծ կամ հավասար ցանկացած թիվ (օրինակ՝ 1000, 1010 կամ 1030): Մենք պարզապես հստակորեն նշել ենք, որ ցանկանում ենք, որ մեր հոսքը պահպանի առնվազն 1000 գրառում: Սա շատ ավելի արդյունավետ է դարձնում հիշողության կառավարումը Redis-ի ներսում:

Կա նաև առանձին թիմ XTRIM, որն անում է նույն բանը.

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Մշտական ​​պահեստավորում և կրկնօրինակում

Redis Stream-ը ասինխրոն կերպով կրկնօրինակվում է ստրկական հանգույցներին և պահվում այնպիսի ֆայլերում, ինչպիսիք են AOF-ը (բոլոր տվյալների նկարը) և RDB-ն (գրելու բոլոր գործողությունների մատյան): Աջակցվում է նաև սպառողների խմբերի վիճակի կրկնօրինակումը: Հետևաբար, եթե հաղորդագրությունը գլխավոր հանգույցում գտնվում է «սպասող» կարգավիճակում, ապա ստրկական հանգույցներում այս հաղորդագրությունը կունենա նույն կարգավիճակը:

Առանձին տարրերի հեռացում հոսքից

Հաղորդագրությունները ջնջելու հատուկ հրաման կա XDEL. Հրամանը ստանում է շղթայի անունը, որին հաջորդում է ջնջվող հաղորդագրության ID-ները.

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Այս հրամանն օգտագործելիս պետք է հաշվի առնել, որ իրական հիշողությունը անմիջապես չի ազատվի:

Զրոյական երկարությամբ հոսքեր

Հոսքերի և այլ Redis տվյալների կառուցվածքների միջև տարբերությունն այն է, որ երբ տվյալների այլ կառուցվածքներն այլևս չունեն տարրեր իրենց ներսում, որպես կողմնակի ազդեցություն, տվյալների կառուցվածքն ինքնին կհեռացվի հիշողությունից: Այսպիսով, օրինակ, տեսակավորված հավաքածուն ամբողջությամբ կհեռացվի, երբ ZREM զանգը հեռացնի վերջին տարրը: Փոխարենը, թելերը թույլատրվում է մնալ հիշողության մեջ նույնիսկ առանց որևէ տարրերի ներսում:

Ամփոփում

Redis Stream-ը իդեալական է հաղորդագրությունների բրոքերների, հաղորդագրությունների հերթերի, միասնական գրանցման և պատմություն պահելու զրույցի համակարգեր ստեղծելու համար:

Ինչպես մի անգամ ասացի Նիկլաուս Վիրտ, ծրագրերն ալգորիթմներ են՝ գումարած տվյալների կառուցվածքներ, և Redis-ն արդեն երկուսն էլ տալիս է։

Source: www.habr.com

Добавить комментарий