Redis Stream-ը նոր վերացական տվյալների տեսակ է, որը ներկայացվել է Redis-ում 5.0 տարբերակով
Հայեցակարգային առումով, Redis Stream-ը ցուցակ է, որին կարող եք ավելացնել գրառումներ: Յուրաքանչյուր մուտք ունի եզակի նույնացուցիչ: Լռելյայնորեն, ID-ն ավտոմատ կերպով ստեղծվում է և ներառում է ժամանակի դրոշմ: Հետևաբար, դուք կարող եք ժամանակի ընթացքում հարցումներ կատարել գրառումների միջակայքերում կամ ստանալ նոր տվյալներ հոսքի մեջ հայտնվելուն պես, ինչպես Unix-ի «tail -f» հրամանը կարդում է log ֆայլը և սառեցնում նոր տվյալներին սպասելիս: Նկատի ունեցեք, որ մի քանի հաճախորդներ կարող են միաժամանակ լսել շարանը, ճիշտ նույնքան «tail -f» պրոցեսներ կարող են միաժամանակ կարդալ ֆայլը՝ առանց միմյանց հետ հակասելու:
Տվյալների նոր տեսակի բոլոր առավելությունները հասկանալու համար եկեք արագ նայենք երկար ժամանակ գոյություն ունեցող Redis կառույցներին, որոնք մասամբ կրկնում են Redis Stream-ի ֆունկցիոնալությունը:
Redis PUB/SUB
Redis Pub/Sub-ը պարզ հաղորդագրությունների համակարգ է, որն արդեն ներկառուցված է ձեր հիմնական արժեքի խանութում: Այնուամենայնիվ, պարզությունը գալիս է իր գնով.
- Եթե հրատարակիչը ինչ-ինչ պատճառներով ձախողվում է, ուրեմն նա կորցնում է իր բոլոր բաժանորդներին
- Հրատարակիչը պետք է իմանա իր բոլոր բաժանորդների ճշգրիտ հասցեն
- Հրատարակիչը կարող է ծանրաբեռնել իր բաժանորդներին աշխատանքով, եթե տվյալներն ավելի արագ են հրապարակվում, քան դրանք մշակվում են
- Հաղորդագրությունը հրապարակումից անմիջապես հետո ջնջվում է հրատարակչի բուֆերից՝ անկախ նրանից, թե քանի բաժանորդի է այն առաքվել և որքան արագ են նրանք կարողացել մշակել այս հաղորդագրությունը:
- Բոլոր բաժանորդները կստանան հաղորդագրությունը միաժամանակ: Բաժանորդներն իրենք պետք է ինչ-որ կերպ պայմանավորվեն իրենց միջև նույն հաղորդագրության մշակման կարգի վերաբերյալ:
- Չկա ներկառուցված մեխանիզմ, որը հաստատում է, որ բաժանորդը հաջողությամբ մշակել է հաղորդագրությունը: Եթե բաժանորդը ստանում է հաղորդագրություն և խափանում է մշակման ընթացքում, հրատարակիչը չի իմանա այդ մասին:
Redis List
Redis List-ը տվյալների կառուցվածք է, որն աջակցում է ընթերցման հրամանների արգելափակմանը: Դուք կարող եք ավելացնել և կարդալ հաղորդագրություններ ցանկի սկզբից կամ վերջից: Այս կառուցվածքի հիման վրա դուք կարող եք լավ բուրգ կամ հերթ ստեղծել ձեր բաշխված համակարգի համար, և շատ դեպքերում դա բավարար կլինի: Հիմնական տարբերությունները Redis Pub/Sub-ից.
- Հաղորդագրությունը առաքվում է մեկ հաճախորդի: Առաջին ընթերցմամբ արգելափակված հաճախորդը նախ կստանա տվյալները:
- Քլինթն ինքը պետք է սկսի յուրաքանչյուր հաղորդագրության ընթերցման գործողությունը: List-ը ոչինչ չգիտի հաճախորդների մասին:
- Հաղորդագրությունները պահվում են այնքան ժամանակ, մինչև ինչ-որ մեկը կարդա դրանք կամ բացահայտորեն չջնջի դրանք: Եթե դուք կարգավորեք Redis սերվերը, որպեսզի տվյալները լցվի սկավառակի վրա, ապա համակարգի հուսալիությունը կտրուկ աճում է:
Ներածություն Stream-ին
Հոսքի մեջ մուտքի ավելացում
Թիմ XADD ավելացնում է նոր մուտք դեպի հոսք: Գրառումը պարզապես տող չէ, այն բաղկացած է մեկ կամ մի քանի բանալի-արժեք զույգերից: Այսպիսով, յուրաքանչյուր գրառում արդեն կառուցված է և նման է CSV ֆայլի կառուցվածքին:
> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0
Վերևի օրինակում հոսքին ավելացնում ենք երկու դաշտ՝ «mystream» անունով (բանալի)՝ «sensor-id» և «ջերմաստիճան»՝ համապատասխանաբար «1234» և «19.8» արժեքներով: Որպես երկրորդ արգումենտ, հրամանը վերցնում է նույնացուցիչ, որը վերագրվելու է մուտքին. այս նույնացուցիչը եզակիորեն նույնացնում է հոսքի յուրաքանչյուր մուտքը: Այնուամենայնիվ, այս դեպքում մենք անցանք *, քանի որ ցանկանում ենք, որ Redis-ը մեզ համար նոր ID գեներացնի: Յուրաքանչյուր նոր ID-ն կավելանա: Հետևաբար, յուրաքանչյուր նոր մուտք կունենա ավելի բարձր նույնացուցիչ՝ նախորդ գրառումների համեմատությամբ:
Նույնացուցիչի ձևաչափը
Հրամանով վերադարձված մուտքի ID-ն XADD, բաղկացած է երկու մասից.
{millisecondsTime}-{sequenceNumber}
millisecondsTime — Unix-ի ժամանակը միլիվայրկյաններով (Redis սերվերի ժամանակը): Այնուամենայնիվ, եթե ընթացիկ ժամը նույնն է կամ պակաս է նախորդ ձայնագրության ժամանակից, ապա օգտագործվում է նախորդ ձայնագրության ժամանակի դրոշմը: Հետևաբար, եթե սերվերի ժամանակը վերադառնում է ժամանակի հետ, նոր նույնացուցիչը դեռևս կպահպանի ավելացման հատկությունը:
հաջորդականությունՀամար օգտագործվում է նույն միլիվայրկյանում ստեղծված գրառումների համար: հաջորդականությունՀամար կավելացվի 1-ով նախորդ մուտքի համեմատ: Քանի որ հաջորդականությունՀամար ունի 64 բիթ չափ, այնուհետև գործնականում չպետք է սահմանափակվի գրառումների քանակի սահմանափակում, որը կարող է ստեղծվել մեկ միլիվայրկյանում:
Նման նույնացուցիչների ձևաչափն առաջին հայացքից կարող է տարօրինակ թվալ: Անվստահ ընթերցողը կարող է մտածել, թե ինչու է ժամանակը նույնացուցիչի մի մասը: Պատճառն այն է, որ Redis հոսքերը աջակցում են միջակայքի հարցումները ID-ով: Քանի որ նույնացուցիչը կապված է գրառումի ստեղծման ժամանակի հետ, դա հնարավորություն է տալիս հարցումներ կատարել ժամանակային միջակայքերում: Մենք կանդրադառնանք կոնկրետ օրինակին, երբ նայենք հրամանին XRANGE.
Եթե ինչ-ինչ պատճառներով օգտատերը պետք է նշի իր սեփական նույնացուցիչը, որն, օրինակ, կապված է ինչ-որ արտաքին համակարգի հետ, ապա մենք կարող ենք այն փոխանցել հրամանին. XADD *-ի փոխարեն, ինչպես ցույց է տրված ստորև.
> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2
Խնդրում ենք նկատի ունենալ, որ այս դեպքում դուք պետք է ինքներդ վերահսկեք ID-ի ավելացումը: Մեր օրինակում նվազագույն նույնացուցիչը «0-1» է, ուստի հրամանը չի ընդունի մեկ այլ նույնացուցիչ, որը հավասար է կամ փոքր է «0-1»-ին:
> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
Մեկ հոսքի համար գրառումների քանակը
Հոսքի մեջ գրառումների քանակը հնարավոր է ստանալ պարզապես հրամանի միջոցով XLEN. Մեր օրինակի համար այս հրամանը կվերադարձնի հետևյալ արժեքը.
> XLEN somestream
(integer) 2
Շրջանակի հարցումներ - XRANGE և XREVRANGE
Տվյալներ ըստ տիրույթի պահանջելու համար մենք պետք է նշենք երկու նույնացուցիչ՝ միջակայքի սկիզբը և վերջը: Վերադարձված միջակայքը կներառի բոլոր տարրերը, ներառյալ սահմանները: Կան նաև երկու հատուկ նույնացուցիչներ «-» և «+», որոնք համապատասխանաբար նշանակում են հոսքի ամենափոքր (առաջին գրառումը) և ամենամեծ (վերջին գրառումը) նույնացուցիչը: Ստորև բերված օրինակը ցույց կտա հոսքի բոլոր գրառումները:
> XRANGE mystream - +
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
2) 1) 1518951482479-0
2) 1) "sensor-id"
2) "9999"
3) "temperature"
4) "18.2"
Յուրաքանչյուր վերադարձված գրառում իրենից ներկայացնում է երկու տարրերից բաղկացած զանգված՝ նույնացուցիչ և բանալի-արժեք զույգերի ցանկ: Մենք արդեն ասացինք, որ գրառումների նույնացուցիչները կապված են ժամանակի հետ: Հետևաբար, մենք կարող ենք պահանջել որոշակի ժամանակահատվածի շրջանակ: Այնուամենայնիվ, մենք կարող ենք հարցման մեջ նշել ոչ թե ամբողջական նույնացուցիչը, այլ միայն Unix ժամանակը, բաց թողնելով դրա հետ կապված մասը. հաջորդականությունՀամար. Նույնացուցիչի բաց թողնված մասը ինքնաբերաբար կսահմանվի զրոյի տիրույթի սկզբում և առավելագույն հնարավոր արժեքի միջակայքի վերջում: Ստորև բերված է օրինակ, թե ինչպես կարող եք պահանջել երկու միլիվայրկյան տիրույթ:
> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
Մենք ունենք միայն մեկ մուտք այս տիրույթում, սակայն իրական տվյալների հավաքածուներում վերադարձված արդյունքը կարող է հսկայական լինել: Այս պատճառով XRANGE աջակցում է COUNT տարբերակը: Նշելով քանակությունը, մենք կարող ենք պարզապես ստանալ առաջին N գրառումները: Եթե մեզ անհրաժեշտ լինի ստանալ հաջորդ N գրառումները (էջադրում), ապա կարող ենք օգտագործել վերջին ստացված ID-ն, ավելացնել այն հաջորդականությունՀամար մեկով և նորից հարցրեք. Դիտարկենք սա հետևյալ օրինակում։ Մենք սկսում ենք ավելացնել 10 տարր հետ XADD (ենթադրելով, որ mystream-ն արդեն լցված է 10 տարրով): Մեկ հրամանի համար 2 տարր ստանալով կրկնությունը սկսելու համար մենք սկսում ենք ամբողջ տիրույթից, բայց COUNT-ով, որը հավասար է 2-ի:
> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
Հաջորդ երկու տարրերի հետ կրկնությունը շարունակելու համար մենք պետք է ընտրենք ստացված վերջին ID-ն, այսինքն՝ 1519073279157-0, և ավելացնենք 1-ը: հաջորդականությունՀամար.
Ստացված ID-ն, այս դեպքում՝ 1519073279157-1, այժմ կարող է օգտագործվել որպես ընդգրկույթի նոր մեկնարկի արգումենտ հաջորդ զանգի համար։ XRANGE:
> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
2) 1) "foo"
2) "value_3"
2) 1) 1519073281432-0
2) 1) "foo"
2) "value_4"
Եվ այսպես շարունակ։ Քանի որ բարդությունը XRANGE Որոնման համար O(log(N)) է, իսկ այնուհետև O(M)՝ M տարրերը վերադարձնելու համար, ապա յուրաքանչյուր կրկնվող քայլ արագ է: Այսպիսով, օգտագործելով XRANGE հոսքերը կարող են արդյունավետ կերպով կրկնվել:
Թիմ XREVRANGE համարժեք է XRANGE, բայց տարրերը վերադարձնում է հակառակ հերթականությամբ.
> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
2) 1) "foo"
2) "value_10"
Խնդրում ենք նկատի ունենալ, որ հրամանը XREVRANGE վերցնում է միջակայքի արգումենտները սկսել և դադարեցնել հակառակ հերթականությամբ:
Նոր գրառումների ընթերցում XREAD-ի միջոցով
Հաճախ խնդիր է առաջանում բաժանորդագրվել հոսքին և ստանալ միայն նոր հաղորդագրություններ: Այս հայեցակարգը կարող է նման լինել Redis Pub/Sub-ին կամ արգելափակել Redis List-ը, սակայն կան հիմնարար տարբերություններ Redis Stream-ի օգտագործման մեջ.
- Յուրաքանչյուր նոր հաղորդագրություն լռելյայն հանձնվում է յուրաքանչյուր բաժանորդի: Այս պահվածքը տարբերվում է արգելափակող Redis List-ից, որտեղ նոր հաղորդագրությունը կկարդա միայն մեկ բաժանորդ:
- Մինչ Redis Pub/Sub-ում բոլոր հաղորդագրությունները մոռացվում են և երբեք չեն պահպանվում, Stream-ում բոլոր հաղորդագրությունները պահվում են անորոշ ժամանակով (եթե հաճախորդը բացահայտորեն ջնջում է):
- Redis Stream-ը թույլ է տալիս տարբերակել հաղորդագրությունների հասանելիությունը մեկ հոսքի ընթացքում: Հատուկ բաժանորդը կարող է տեսնել միայն իր անձնական հաղորդագրությունների պատմությունը:
Դուք կարող եք բաժանորդագրվել թեմայի և ստանալ նոր հաղորդագրություններ՝ օգտագործելով հրամանը XREAD. Դա մի փոքր ավելի բարդ է, քան XRANGE, ուստի նախ կսկսենք ավելի պարզ օրինակներով:
> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
Վերևի օրինակը ցույց է տալիս չարգելափակող ձև XREAD. Նկատի ունեցեք, որ COUNT տարբերակը պարտադիր չէ: Փաստորեն, հրամանի միակ պահանջվող տարբերակը STREAMS տարբերակն է, որը սահմանում է հոսքերի ցանկը համապատասխան առավելագույն նույնացուցիչի հետ միասին: Մենք գրել ենք «STREAMS mystream 0» - մենք ցանկանում ենք ստանալ mystream հոսքի բոլոր գրառումները «0-0»-ից մեծ նույնացուցիչով: Ինչպես տեսնում եք օրինակից, հրամանը վերադարձնում է շարանի անունը, քանի որ մենք կարող ենք միաժամանակ բաժանորդագրվել մի քանի թելերի: Մենք կարող ենք գրել, օրինակ, «STREAMS mystream otherstream 0 0»: Խնդրում ենք նկատի ունենալ, որ STREAMS տարբերակից հետո մենք պետք է նախ տրամադրենք բոլոր անհրաժեշտ հոսքերի անունները և միայն այնուհետև նույնացուցիչների ցանկը:
Այս պարզ ձևով հրամանը որևէ առանձնահատուկ բան չի անում համեմատած XRANGE. Այնուամենայնիվ, հետաքրքիրն այն է, որ մենք հեշտությամբ կարող ենք շրջվել XREAD արգելափակող հրամանին՝ նշելով BLOCK արգումենտը.
> XREAD BLOCK 0 STREAMS mystream $
Վերևի օրինակում նոր BLOCK տարբերակ է նշվում 0 միլիվայրկյան ժամկետով (սա նշանակում է անժամկետ սպասել): Ավելին, mystream հոսքի սովորական նույնացուցիչը փոխանցելու փոխարեն փոխանցվել է $ հատուկ նույնացուցիչ: Այս հատուկ նույնացուցիչը նշանակում է, որ XREAD որպես նույնացուցիչ պետք է օգտագործի mystream-ի առավելագույն նույնացուցիչը: Այսպիսով, մենք նոր հաղորդագրություններ կստանանք միայն այն պահից, երբ սկսել ենք լսել: Որոշ առումներով սա նման է Unix «tail -f» հրամանին:
Նկատի ունեցեք, որ BLOCK տարբերակն օգտագործելիս պարտադիր չէ, որ օգտագործենք $ հատուկ նույնացուցիչը: Մենք կարող ենք օգտագործել հոսքում առկա ցանկացած նույնացուցիչ: Եթե թիմը կարող է անմիջապես սպասարկել մեր հարցումը՝ առանց արգելափակման, ապա դա կանի, հակառակ դեպքում՝ կարգելափակվի։
Արգելափակում XREAD կարող է նաև միանգամից մի քանի թեմաներ լսել, պարզապես անհրաժեշտ է նշել դրանց անունները: Այս դեպքում հրամանը կվերադարձնի տվյալներ ստացած առաջին հոսքի գրառումը: Տվյալ շղթայի համար արգելափակված առաջին բաժանորդը նախ տվյալներ կստանա:
Սպառողների խմբեր
Որոշ առաջադրանքներում մենք ցանկանում ենք սահմանափակել բաժանորդների մուտքը հաղորդագրություններ մեկ թեմայի շրջանակներում: Օրինակ, որտեղ դա կարող է օգտակար լինել, աշխատողների հետ հաղորդագրությունների հերթն է, որը կստանա տարբեր հաղորդագրություններ շղթայից՝ թույլ տալով հաղորդագրությունների մշակումը մասշտաբավորել:
Եթե պատկերացնենք, որ մենք ունենք երեք բաժանորդ C1, C2, C3 և մի թեմա, որը պարունակում է հաղորդագրություններ 1, 2, 3, 4, 5, 6, 7, ապա հաղորդագրությունները կմատուցվեն ստորև ներկայացված գծապատկերում.
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
Այս էֆեկտին հասնելու համար Redis Stream-ը օգտագործում է սպառողների խումբ կոչվող հայեցակարգ: Այս հայեցակարգը նման է կեղծ բաժանորդին, որը տվյալներ է ստանում հոսքից, բայց իրականում սպասարկվում է մի քանի բաժանորդների կողմից խմբի ներսում՝ տրամադրելով որոշակի երաշխիքներ.
- Յուրաքանչյուր հաղորդագրություն ուղարկվում է խմբի ներսում գտնվող մեկ այլ բաժանորդի:
- Խմբի ներսում բաժանորդները նույնացվում են իրենց անունով, որը մեծատառերի զգայուն տող է: Եթե բաժանորդը ժամանակավորապես դուրս է գալիս խմբից, նա կարող է վերականգնվել խմբի մեջ՝ օգտագործելով իր յուրահատուկ անունը:
- Յուրաքանչյուր սպառողների խումբ հետևում է «առաջին չկարդացված հաղորդագրություն» հայեցակարգին: Երբ բաժանորդը պահանջում է նոր հաղորդագրություններ, նա կարող է ստանալ միայն հաղորդագրություններ, որոնք նախկինում երբևէ չեն առաքվել խմբի ներսում գտնվող որևէ բաժանորդի:
- Կա հրաման՝ հստակորեն հաստատելու, որ հաղորդագրությունը հաջողությամբ մշակվել է բաժանորդի կողմից: Քանի դեռ այս հրամանը չի կանչվել, պահանջվող հաղորդագրությունը կմնա «սպասող» կարգավիճակում։
- Սպառողների խմբի շրջանակներում յուրաքանչյուր բաժանորդ կարող է պահանջել իրեն ուղարկված, բայց դեռևս չմշակված հաղորդագրությունների պատմություն («սպասող» կարգավիճակում)
Ինչ-որ իմաստով խմբի վիճակը կարող է արտահայտվել հետևյալ կերպ.
+----------------------------------------+
| consumer_group_name: mygroup
| consumer_group_stream: somekey
| last_delivered_id: 1292309234234-92
|
| consumers:
| "consumer-1" with pending messages
| 1292309234234-4
| 1292309234232-8
| "consumer-42" with pending messages
| ... (and so forth)
+----------------------------------------+
Այժմ ժամանակն է ծանոթանալ Սպառողների խմբի հիմնական հրամաններին, մասնավորապես.
- XGROUP օգտագործվում է խմբեր ստեղծելու, ոչնչացնելու և կառավարելու համար
- XREADGROUP օգտագործվում է հոսքը խմբի միջոցով կարդալու համար
- XACK - այս հրամանը բաժանորդին թույլ է տալիս նշել հաղորդագրությունը որպես հաջող մշակված
Սպառողների խմբի ստեղծում
Ենթադրենք, որ mystream արդեն գոյություն ունի։ Այնուհետև խմբի ստեղծման հրամանը նման կլինի.
> XGROUP CREATE mystream mygroup $
OK
Խումբ ստեղծելիս պետք է նույնացուցիչ փոխանցենք, որից սկսած խումբը կստանա հաղորդագրություններ։ Եթե մենք պարզապես ցանկանում ենք ստանալ բոլոր նոր հաղորդագրությունները, ապա մենք կարող ենք օգտագործել $ հատուկ նույնացուցիչը (ինչպես վերը նշված մեր օրինակում): Եթե հատուկ նույնացուցիչի փոխարեն նշեք 0, ապա շղթայի բոլոր հաղորդագրությունները հասանելի կլինեն խմբին:
Այժմ, երբ խումբը ստեղծվել է, մենք կարող ենք անմիջապես սկսել հաղորդագրությունները կարդալ հրամանի միջոցով XREADGROUP. Այս հրամանը շատ նման է XREAD և աջակցում է կամընտիր BLOCK տարբերակը: Այնուամենայնիվ, կա պահանջվող GROUP տարբերակ, որը միշտ պետք է նշվի երկու արգումենտով՝ խմբի անվանումը և բաժանորդի անունը: Աջակցվում է նաև COUNT տարբերակը:
Նախքան թեման կարդալը, եկեք այնտեղ տեղադրենք մի քանի հաղորդագրություն.
> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0
Այժմ փորձենք կարդալ այս հոսքը խմբի միջոցով.
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
Վերոնշյալ հրամանը բառացիորեն կարդում է հետևյալ կերպ.
«Ես՝ բաժանորդ Ալիսս, mygroup-ի անդամ, ցանկանում եմ կարդալ mystream-ից մեկ հաղորդագրություն, որը նախկինում երբեք որևէ մեկին չի հանձնվել»:
Ամեն անգամ, երբ բաժանորդը որևէ գործողություն է կատարում խմբի վրա, նա պետք է նշի իր անունը՝ եզակի կերպով նույնացնելով իրեն խմբի ներսում: Վերոնշյալ հրամանում կա ևս մեկ շատ կարևոր մանրամասն՝ «>» հատուկ նույնացուցիչը։ Այս հատուկ նույնացուցիչը զտում է հաղորդագրությունները՝ թողնելով միայն նրանք, որոնք նախկինում երբեք չեն առաքվել:
Բացի այդ, հատուկ դեպքերում կարող եք նշել իրական նույնացուցիչ, ինչպիսին է 0-ը կամ որևէ այլ վավեր նույնացուցիչ: Այս դեպքում հրամանը XREADGROUP Ձեզ կվերադարձնի «սպասող» կարգավիճակով հաղորդագրությունների պատմություն, որոնք առաքվել են նշված բաժանորդին (Ալիս), բայց դեռ չեն հաստատվել հրամանի միջոցով: XACK.
Մենք կարող ենք ստուգել այս վարքագիծը՝ անմիջապես նշելով ID 0, առանց տարբերակի COUNT. Մենք պարզապես կտեսնենք մեկ սպասող հաղորդագրություն, այսինքն՝ Apple-ի հաղորդագրությունը.
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
Այնուամենայնիվ, եթե մենք հաստատենք, որ հաղորդագրությունը հաջողությամբ մշակվել է, ապա այն այլևս չի ցուցադրվի.
> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) (empty list or set)
Հիմա Բոբի հերթն է ինչ-որ բան կարդալու.
> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
2) 1) 1526569506935-0
2) 1) "message"
2) "strawberry"
Mygroup-ի անդամ Բոբը խնդրեց ոչ ավելի, քան երկու հաղորդագրություն: Հրահանգը հաղորդում է միայն չհանձնված հաղորդագրությունների մասին «>» հատուկ նույնացուցիչի շնորհիվ: Ինչպես տեսնում եք, «խնձոր» հաղորդագրությունը չի ցուցադրվի, քանի որ այն արդեն առաքվել է Ալիսին, ուստի Բոբը ստանում է «նարնջագույն» և «ելակ»:
Այս կերպ Ալիսը, Բոբը և խմբի ցանկացած այլ բաժանորդ կարող են կարդալ տարբեր հաղորդագրություններ նույն հոսքից: Նրանք կարող են նաև կարդալ չմշակված հաղորդագրությունների իրենց պատմությունը կամ նշել հաղորդագրությունները որպես մշակված:
Մի քանի բան պետք է հիշել.
- Հենց որ բաժանորդը հաղորդագրությունը համարի հրաման XREADGROUP, այս հաղորդագրությունը անցնում է «սպասող» վիճակի և նշանակվում է տվյալ բաժանորդին: Խմբի մյուս բաժանորդները չեն կարողանա կարդալ այս հաղորդագրությունը:
- Բաժանորդները ավտոմատ կերպով ստեղծվում են առաջին իսկ հիշատակումից հետո, դրանք հստակ ստեղծելու կարիք չկա:
- Հետ XREADGROUP դուք կարող եք միաժամանակ կարդալ բազմաթիվ տարբեր թեմաներից հաղորդագրություններ, սակայն դա աշխատելու համար նախ անհրաժեշտ է ստեղծել նույն անունով խմբեր յուրաքանչյուր թեմայի համար՝ օգտագործելով XGROUP
Վերականգնում ձախողումից հետո
Բաժանորդը կարող է վերականգնվել ձախողումից և վերընթերցել իր հաղորդագրությունների ցանկը «սպասող» կարգավիճակով: Այնուամենայնիվ, իրական աշխարհում բաժանորդները կարող են ի վերջո ձախողվել: Ի՞նչ է պատահում բաժանորդի խրված հաղորդագրությունների հետ, եթե բաժանորդը չի կարողանում վերականգնվել ձախողումից հետո:
Սպառողների խումբն առաջարկում է մի գործառույթ, որն օգտագործվում է հենց նման դեպքերում՝ երբ պետք է փոխել հաղորդագրությունների սեփականատիրոջը:
Առաջին բանը, որ դուք պետք է անեք, զանգահարեք հրամանը ԱՊԱՀՈՎ, որը ցուցադրում է խմբի բոլոր հաղորդագրությունները «սպասում» կարգավիճակով: Իր ամենապարզ ձևով հրամանը կանչվում է միայն երկու արգումենտով՝ թեմայի անվանումը և խմբի անվանումը.
> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
2) "2"
Թիմը ցուցադրեց չմշակված հաղորդագրությունների քանակը ամբողջ խմբի և յուրաքանչյուր բաժանորդի համար: Մենք ունենք միայն Բոբը երկու չմարված հաղորդագրություններով, քանի որ Ալիսի պահանջած միակ հաղորդագրությունը հաստատվել է XACK.
Մենք կարող ենք լրացուցիչ տեղեկություններ պահանջել՝ օգտագործելով ավելի շատ փաստարկներ.
XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - նույնացուցիչների տիրույթ (կարող եք օգտագործել «-» և «+»)
{count} — առաքման փորձերի քանակը
{consumer-name} - խմբի անուն
> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
2) 1) 1526569506935-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
Այժմ մենք ունենք մանրամասներ յուրաքանչյուր հաղորդագրության համար՝ ID, բաժանորդի անունը, անգործության ժամանակը միլիվայրկյաններով և վերջապես առաքման փորձերի քանակը: Մենք ունենք երկու հաղորդագրություն Բոբից, և նրանք անգործության են մատնվել 74170458 միլիվայրկյան, մոտ 20 ժամ:
Խնդրում ենք նկատի ունենալ, որ ոչ ոք մեզ չի խանգարում ստուգել, թե որն է հաղորդագրության բովանդակությունը պարզապես օգտագործելով XRANGE.
> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
Պարզապես պետք է կրկնել նույն նույնացուցիչը երկու անգամ արգումենտներում: Այժմ, երբ մենք որոշակի պատկերացում ունենք, Ալիսը կարող է որոշել, որ 20 ժամ դադարից հետո Բոբը, հավանաբար, չի վերականգնվի, և ժամանակն է հարցնել այդ հաղորդագրությունները և վերսկսել դրանք Բոբի մշակումը: Դրա համար մենք օգտագործում ենք հրամանը XՊԱՅՑՈՒՄ:
XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}
Օգտագործելով այս հրամանը, մենք կարող ենք ստանալ «օտար» հաղորդագրություն, որը դեռ չի մշակվել՝ փոխելով սեփականատերը {consumer}: Այնուամենայնիվ, մենք կարող ենք նաև տրամադրել նվազագույն անգործության ժամանակ {min-idle-time}: Սա օգնում է խուսափել մի իրավիճակից, երբ երկու հաճախորդներ փորձում են միաժամանակ փոխել նույն հաղորդագրությունների սեփականատիրոջը.
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
Առաջին հաճախորդը կվերակայի անգործության ժամանակը և կավելացնի առաքման հաշվիչը: Այսպիսով, երկրորդ հաճախորդը չի կարողանա պահանջել այն:
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
Հաղորդագրությունը հաջողությամբ ներկայացվել է Ալիսի կողմից, որն այժմ կարող է մշակել հաղորդագրությունը և հաստատել այն:
Վերոնշյալ օրինակից կարող եք տեսնել, որ հաջողված հարցումը վերադարձնում է հենց հաղորդագրության բովանդակությունը: Այնուամենայնիվ, դա անհրաժեշտ չէ: JUSTID տարբերակը կարող է օգտագործվել միայն հաղորդագրությունների նույնացուցիչները վերադարձնելու համար: Սա օգտակար է, եթե ձեզ չեն հետաքրքրում հաղորդագրության մանրամասները և ցանկանում եք բարձրացնել համակարգի աշխատանքը:
Առաքման հաշվիչ
Հաշվիչը, որը դուք տեսնում եք ելքում ԱՊԱՀՈՎ յուրաքանչյուր հաղորդագրության առաքումների թիվն է: Նման հաշվիչը ավելացվում է երկու եղանակով. երբ հաղորդագրությունը հաջողությամբ պահանջվում է միջոցով XՊԱՅՑՈՒՄ կամ երբ օգտագործվում է զանգ XREADGROUP.
Որոշ հաղորդագրությունների մի քանի անգամ առաքումը նորմալ է: Հիմնական բանն այն է, որ բոլոր հաղորդագրությունները ի վերջո մշակվեն: Երբեմն հաղորդագրություն մշակելիս խնդիրներ են առաջանում, քանի որ հաղորդագրությունն ինքնին վնասված է, կամ հաղորդագրության մշակումը սխալ է առաջացնում մշակողի կոդը: Այս դեպքում կարող է պարզվել, որ ոչ ոք չի կարողանա մշակել այս հաղորդագրությունը։ Քանի որ մենք ունենք առաքման փորձի հաշվիչ, մենք կարող ենք օգտագործել այս հաշվիչը՝ նման իրավիճակները հայտնաբերելու համար: Հետևաբար, երբ առաքումների քանակը հասնի ձեր նշած բարձր թվին, հավանաբար ավելի խելամիտ կլինի նման հաղորդագրություն տեղադրել մեկ այլ թեմայում և ծանուցում ուղարկել համակարգի ադմինիստրատորին:
Թեմայի վիճակը
Թիմ XINFO օգտագործվում է թեմայի և դրա խմբերի մասին տարբեր տեղեկություններ պահանջելու համար: Օրինակ, հիմնական հրամանն ունի հետևյալ տեսքը.
> XINFO STREAM mystream
1) length
2) (integer) 13
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2
9) first-entry
10) 1) 1524494395530-0
2) 1) "a"
2) "1"
3) "b"
4) "2"
11) last-entry
12) 1) 1526569544280-0
2) 1) "message"
2) "banana"
Վերևի հրամանը ցուցադրում է ընդհանուր տեղեկություններ նշված հոսքի մասին: Հիմա մի փոքր ավելի բարդ օրինակ.
> XINFO GROUPS mystream
1) 1) name
2) "mygroup"
3) consumers
4) (integer) 2
5) pending
6) (integer) 2
2) 1) name
2) "some-other-group"
3) consumers
4) (integer) 1
5) pending
6) (integer) 0
Վերևի հրամանը ցուցադրում է ընդհանուր տեղեկություններ նշված թեմայի բոլոր խմբերի համար
> XINFO CONSUMERS mystream mygroup
1) 1) name
2) "Alice"
3) pending
4) (integer) 1
5) idle
6) (integer) 9104628
2) 1) name
2) "Bob"
3) pending
4) (integer) 1
5) idle
6) (integer) 83841983
Վերևի հրամանը ցուցադրում է տեղեկատվություն նշված հոսքի և խմբի բոլոր բաժանորդների համար:
Եթե մոռացել եք հրամանի շարահյուսությունը, պարզապես օգնություն խնդրեք հենց հրամանից.
> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname} -- Show consumer groups of group {groupname}.
3) GROUPS {key} -- Show the stream consumer groups.
4) STREAM {key} -- Show information about the stream.
5) HELP -- Print this help.
Հոսքի չափի սահմանաչափ
Շատ հավելվածներ չեն ցանկանում հավերժ հավաքել տվյալներ հոսքի մեջ: Հաճախ օգտակար է յուրաքանչյուր շղթայի համար թույլատրելի առավելագույն թվով հաղորդագրություններ ունենալ: Այլ դեպքերում, օգտակար է բոլոր հաղորդագրությունները շղթայից տեղափոխել մեկ այլ մշտական պահեստ, երբ հասնի շղթայի նշված չափը: Դուք կարող եք սահմանափակել հոսքի չափը՝ օգտագործելով հրամանի MAXLEN պարամետրը XADD:
> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
MAXLEN-ն օգտագործելիս հին գրառումներն ավտոմատ կերպով ջնջվում են, երբ հասնում են որոշակի երկարության, ուստի հոսքն ունի մշտական չափ: Այնուամենայնիվ, այս դեպքում էտումը տեղի չի ունենում Redis-ի հիշողության մեջ ամենաարդյունավետ ձևով: Դուք կարող եք բարելավել իրավիճակը հետևյալ կերպ.
XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
Վերոհիշյալ օրինակի ~ փաստարկը նշանակում է, որ մենք պարտադիր չէ, որ հոսքի երկարությունը սահմանափակենք որոշակի արժեքով: Մեր օրինակում սա կարող է լինել 1000-ից մեծ կամ հավասար ցանկացած թիվ (օրինակ՝ 1000, 1010 կամ 1030): Մենք պարզապես հստակորեն նշել ենք, որ ցանկանում ենք, որ մեր հոսքը պահպանի առնվազն 1000 գրառում: Սա շատ ավելի արդյունավետ է դարձնում հիշողության կառավարումը Redis-ի ներսում:
Կա նաև առանձին թիմ XTRIM, որն անում է նույն բանը.
> XTRIM mystream MAXLEN 10
> XTRIM mystream MAXLEN ~ 10
Մշտական պահեստավորում և կրկնօրինակում
Redis Stream-ը ասինխրոն կերպով կրկնօրինակվում է ստրկական հանգույցներին և պահվում այնպիսի ֆայլերում, ինչպիսիք են AOF-ը (բոլոր տվյալների նկարը) և RDB-ն (գրելու բոլոր գործողությունների մատյան): Աջակցվում է նաև սպառողների խմբերի վիճակի կրկնօրինակումը: Հետևաբար, եթե հաղորդագրությունը գլխավոր հանգույցում գտնվում է «սպասող» կարգավիճակում, ապա ստրկական հանգույցներում այս հաղորդագրությունը կունենա նույն կարգավիճակը:
Առանձին տարրերի հեռացում հոսքից
Հաղորդագրությունները ջնջելու հատուկ հրաման կա XDEL. Հրամանը ստանում է շղթայի անունը, որին հաջորդում է ջնջվող հաղորդագրության ID-ները.
> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
2) 1) "value"
2) "3"
Այս հրամանն օգտագործելիս պետք է հաշվի առնել, որ իրական հիշողությունը անմիջապես չի ազատվի:
Զրոյական երկարությամբ հոսքեր
Հոսքերի և այլ Redis տվյալների կառուցվածքների միջև տարբերությունն այն է, որ երբ տվյալների այլ կառուցվածքներն այլևս չունեն տարրեր իրենց ներսում, որպես կողմնակի ազդեցություն, տվյալների կառուցվածքն ինքնին կհեռացվի հիշողությունից: Այսպիսով, օրինակ, տեսակավորված հավաքածուն ամբողջությամբ կհեռացվի, երբ ZREM զանգը հեռացնի վերջին տարրը: Փոխարենը, թելերը թույլատրվում է մնալ հիշողության մեջ նույնիսկ առանց որևէ տարրերի ներսում:
Ամփոփում
Redis Stream-ը իդեալական է հաղորդագրությունների բրոքերների, հաղորդագրությունների հերթերի, միասնական գրանցման և պատմություն պահելու զրույցի համակարգեր ստեղծելու համար:
Ինչպես մի անգամ ասացի
Source: www.habr.com