Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով
Բարև, Հաբր: Այսօր մենք կկառուցենք մի համակարգ, որը կմշակի Apache Kafka-ի հաղորդագրությունների հոսքերը՝ օգտագործելով Spark Streaming-ը և կգրի մշակման արդյունքները AWS RDS ամպային տվյալների բազայում:
Պատկերացնենք, որ որոշակի վարկային հաստատություն մեզ խնդիր է դնում իր բոլոր մասնաճյուղերով մուտքային գործարքները «թռիչքով» մշակել: Դա կարող է արվել գանձապետարանի համար բաց արժութային դիրքի, գործարքների սահմանաչափերի կամ ֆինանսական արդյունքների և այլնի արագ հաշվարկման նպատակով:
Ինչպես իրականացնել այս դեպքը առանց կախարդական և կախարդական կախարդանքների օգտագործման - կարդացեք կտրվածքի տակ: Գնա՛
Իհարկե, իրական ժամանակում մեծ քանակությամբ տվյալների մշակումը լայն հնարավորություններ է տալիս ժամանակակից համակարգերում օգտագործելու համար: Դրա համար ամենահայտնի համակցություններից մեկը Apache Kafka-ի և Spark Streaming-ի տանդեմն է, որտեղ Կաֆկան ստեղծում է մուտքային հաղորդագրությունների փաթեթների հոսք, և Spark Streaming-ը մշակում է այդ փաթեթները որոշակի ժամանակային ընդմիջումով:
Հավելվածի սխալների հանդուրժողականությունը բարձրացնելու համար մենք կօգտագործենք անցակետեր: Այս մեխանիզմով, երբ Spark Streaming շարժիչը պետք է վերականգնի կորցրած տվյալները, այն պետք է միայն վերադառնա վերջին անցակետ և այնտեղից վերսկսի հաշվարկները:
Մշակված համակարգի ճարտարապետությունը
Օգտագործված բաղադրիչներ.
Apache Kafka բաշխված հրապարակում-բաժանորդագրվել հաղորդագրությունների համակարգ է: Հարմար է ինչպես օֆլայն, այնպես էլ առցանց հաղորդագրությունների սպառման համար: Տվյալների կորուստը կանխելու համար Կաֆկայի հաղորդագրությունները պահվում են սկավառակի վրա և կրկնօրինակվում կլաստերի ներսում: Kafka համակարգը կառուցված է ZooKeeper համաժամացման ծառայության վերևում;
Apache Spark Streaming - Spark բաղադրիչ հոսքային տվյալների մշակման համար: Spark Streaming մոդուլը կառուցված է միկրո խմբաքանակի ճարտարապետությամբ, որտեղ տվյալների հոսքը մեկնաբանվում է որպես փոքր տվյալների փաթեթների շարունակական հաջորդականություն: Spark Streaming-ը տվյալներ է վերցնում տարբեր աղբյուրներից և միավորում դրանք փոքր փաթեթների մեջ: Նոր փաթեթներ են ստեղծվում կանոնավոր պարբերականությամբ: Յուրաքանչյուր ժամանակային միջակայքի սկզբում ստեղծվում է նոր փաթեթ, և այդ ինտերվալի ընթացքում ստացված ցանկացած տվյալ ներառվում է փաթեթում: Ընդմիջման վերջում փաթեթների աճը դադարում է: Ինտերվալի չափը որոշվում է պարամետրով, որը կոչվում է խմբաքանակի միջակայք.
Apache Spark SQL - համատեղում է հարաբերական մշակումը Spark ֆունկցիոնալ ծրագրավորման հետ: Կառուցվածքային տվյալներ նշանակում են տվյալներ, որոնք ունեն սխեմա, այսինքն՝ դաշտերի միասնական հավաքածու բոլոր գրառումների համար: Spark SQL-ն աջակցում է տվյալների զանազան կառուցվածքային աղբյուրներից մուտքագրմանը և, սխեմայի տեղեկատվության առկայության շնորհիվ, այն կարող է արդյունավետ կերպով առբերել միայն գրառումների պահանջվող դաշտերը, ինչպես նաև տրամադրում է DataFrame API-ներ.
AWS RDS Համեմատաբար էժան ամպի վրա հիմնված հարաբերական տվյալների բազա է, վեբ ծառայություն, որը հեշտացնում է կարգավորումը, գործարկումը և մասշտաբը, և կառավարվում է անմիջապես Amazon-ի կողմից:
Կաֆկա սերվերի տեղադրում և գործարկում
Նախքան Kafka-ն ուղղակիորեն օգտագործելը, դուք պետք է համոզվեք, որ ունեք Java, քանի որ... JVM-ն օգտագործվում է աշխատանքի համար.
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Հաջորդ քայլը կամընտիր է: Փաստն այն է, որ լռելյայն կարգավորումները թույլ չեն տալիս լիովին օգտագործել Apache Kafka-ի բոլոր հնարավորությունները: Օրինակ՝ ջնջեք թեմա, կատեգորիա, խումբ, որին կարող են հրապարակվել հաղորդագրությունները: Սա փոխելու համար եկեք խմբագրենք կազմաձևման ֆայլը.
vim ~/kafka/config/server.properties
Ֆայլի վերջում ավելացրեք հետևյալը.
delete.topic.enable = true
Նախքան Kafka սերվերը գործարկելը, դուք պետք է գործարկեք ZooKeeper սերվերը; մենք կօգտագործենք օժանդակ սցենարը, որը գալիս է Kafka բաշխման հետ.
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper-ը հաջողությամբ սկսելուց հետո գործարկեք Kafka սերվերը առանձին տերմինալում.
Բաց թողնենք արտադրողին ու սպառողին նորաստեղծ թեմայի փորձարկման պահերը։ Լրացուցիչ մանրամասներ այն մասին, թե ինչպես կարող եք փորձարկել հաղորդագրություններ ուղարկելը և ստանալը, գրված են պաշտոնական փաստաթղթերում. Ուղարկեք մի քանի հաղորդագրություն. Դե, մենք անցնում ենք Python-ում արտադրող գրելուն՝ օգտագործելով KafkaProducer API-ն:
Պրոդյուսեր գրավոր
Արտադրողը կստեղծի պատահական տվյալներ՝ ամեն վայրկյան 100 հաղորդագրություն: Պատահական տվյալներ ասելով հասկանում ենք բառարան, որը բաղկացած է երեք դաշտից.
Մասնաճյուղ - վարկային հաստատության վաճառքի կետի անվանումը.
Արտարժույթ - գործարքի արժույթ;
Քանակ - գործարքի գումարը. Գումարը կլինի դրական թիվ, եթե դա Բանկի կողմից արժույթի գնում է, և բացասական թիվ, եթե այն վաճառք է:
Հաջորդը, օգտագործելով ուղարկելու մեթոդը, մենք հաղորդագրություն ենք ուղարկում սերվերին, մեզ անհրաժեշտ թեմային, JSON ձևաչափով.
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Սցենարը գործարկելիս տերմինալում ստանում ենք հետևյալ հաղորդագրությունները.
Սա նշանակում է, որ ամեն ինչ աշխատում է այնպես, ինչպես մենք էինք ուզում՝ պրոդյուսերը ստեղծում և հաղորդագրություններ է ուղարկում մեզ անհրաժեշտ թեմային:
Հաջորդ քայլը Spark-ի տեղադրումն ու այս հաղորդագրության հոսքի մշակումն է:
Apache Spark-ի տեղադրում
Apache Spark- ը ունիվերսալ և բարձր արդյունավետությամբ կլաստերային հաշվողական հարթակ է:
Spark-ը ավելի լավ է գործում, քան MapReduce մոդելի հանրաճանաչ իրականացումները՝ միաժամանակ աջակցելով հաշվարկման տեսակների ավելի լայն շրջանակին, ներառյալ ինտերակտիվ հարցումները և հոսքի մշակումը: Արագությունը կարևոր դեր է խաղում մեծ քանակությամբ տվյալներ մշակելիս, քանի որ հենց արագությունն է թույլ տալիս ինտերակտիվ աշխատել՝ առանց սպասելու րոպեներ կամ ժամեր ծախսելու: Spark-ի ամենամեծ ուժեղ կողմերից մեկը, որն այն դարձնում է այդքան արագ, հիշողության մեջ հաշվարկներ կատարելու նրա կարողությունն է:
Այս շրջանակը գրված է Scala-ում, այնպես որ նախ պետք է այն տեղադրել.
Գործարկեք ստորև նշված հրամանը bashrc-ում փոփոխություններ կատարելուց հետո.
source ~/.bashrc
AWS PostgreSQL-ի տեղակայում
Մնում է միայն տեղակայել տվյալների բազան, որտեղ մենք կվերբեռնենք մշակված տեղեկատվությունը հոսքերից: Դրա համար մենք կօգտագործենք AWS RDS ծառայությունը:
Գնացեք AWS վահանակ -> AWS RDS -> Տվյալների բազաներ -> Ստեղծեք տվյալների բազա.
Ընտրեք PostgreSQL և սեղմեք Հաջորդը.
Որովհետեւ Այս օրինակը միայն կրթական նպատակների համար է, մենք կօգտագործենք անվճար սերվեր «առնվազն» (Free Tier):
Այնուհետև մենք նշում ենք «Free Tier» բլոկում, և դրանից հետո մեզ ավտոմատ կերպով կառաջարկվի t2.micro դասի օրինակ՝ չնայած թույլ, այն անվճար է և բավականին հարմար է մեր առաջադրանքի համար.
Հաջորդը գալիս են շատ կարևոր բաներ՝ տվյալների բազայի օրինակի անունը, վարպետ օգտագործողի անունը և նրա գաղտնաբառը: Անվանենք օրինակը՝ myHabrTest, վարպետ օգտվող. հաբր, գաղտնաբառը: habr12345 և սեղմեք Հաջորդ կոճակը.
Հաջորդ էջում կան պարամետրեր, որոնք պատասխանատու են դրսից մեր տվյալների բազայի սերվերի հասանելիության և պորտի հասանելիության համար.
Եկեք ստեղծենք նոր կարգավորում VPC անվտանգության խմբի համար, որը թույլ կտա արտաքին մուտք գործել մեր տվյալների բազայի սերվեր 5432 պորտի միջոցով (PostgreSQL):
Եկեք գնանք AWS վահանակ առանձին դիտարկիչի պատուհանում դեպի VPC Dashboard -> Security Groups -> Create Security Group բաժինը:
Մենք սահմանել ենք Անվտանգության խմբի անունը՝ PostgreSQL, նկարագրություն, նշելով, թե որ VPC-ի հետ պետք է կապված լինի այս խումբը և սեղմեք Ստեղծել կոճակը.
Լրացրեք 5432 պորտի ներգնա կանոնները նորաստեղծ խմբի համար, ինչպես ցույց է տրված ստորև նկարում: Դուք չեք կարող ձեռքով նշել նավահանգիստը, բայց բացվող «Type» ցանկից ընտրեք PostgreSQL:
Խստորեն ասած, արժեքը ::/0 նշանակում է մուտքային տրաֆիկի հասանելիություն դեպի սերվեր ամբողջ աշխարհից, ինչը կանոնականորեն լիովին ճիշտ չէ, բայց օրինակը վերլուծելու համար եկեք մեզ թույլ տանք օգտագործել այս մոտեցումը.
Մենք վերադառնում ենք զննարկիչի էջ, որտեղ մենք բացել ենք «Կարգավորել առաջադեմ կարգավորումները» և ընտրել VPC անվտանգության խմբեր բաժնում -> Ընտրել առկա VPC անվտանգության խմբերը -> PostgreSQL:
Հաջորդը, Տվյալների բազայի ընտրանքներում -> Տվյալների բազայի անունը -> սահմանեք անունը - habrDB.
Մենք կարող ենք լռելյայն թողնել մնացած պարամետրերը, բացառությամբ անջատելու պահուստավորումը (պահուստավորման ժամկետը՝ 0 օր), մոնիտորինգը և Performance Insights, լռելյայն: Սեղմեք կոճակի վրա Ստեղծել տվյալների բազա:
Թելերի կարգավորիչ
Վերջնական փուլը կլինի Spark-ի մշակումը, որը յուրաքանչյուր երկու վայրկյանը մեկ կմշակի Կաֆկայից ստացվող նոր տվյալները և արդյունքները մուտքագրելու է տվյալների բազա:
Ինչպես նշվեց վերևում, անցակետերը SparkStreaming-ի հիմնական մեխանիզմն են, որը պետք է կազմաձևվի՝ ապահովելու սխալների հանդուրժողականություն: Մենք կօգտագործենք անցակետեր, և եթե ընթացակարգը ձախողվի, Spark Streaming մոդուլը միայն պետք է վերադառնա վերջին անցակետ և վերսկսի հաշվարկները դրանից՝ կորցրած տվյալները վերականգնելու համար:
Checkpointing-ը կարող է միացված լինել՝ անսարքության հանդուրժող, հուսալի ֆայլային համակարգի (օրինակ՝ HDFS, S3 և այլն) տեղեկատու սահմանելով, որտեղ կպահվեն անցակետի տեղեկատվությունը: Դա արվում է օգտագործելով, օրինակ.
streamingContext.checkpoint(checkpointDirectory)
Մեր օրինակում մենք կօգտագործենք հետևյալ մոտեցումը, այն է՝ եթե checkpointDirectory գոյություն ունի, ապա համատեքստը կվերստեղծվի անցակետի տվյալներից: Եթե գրացուցակը գոյություն չունի (այսինքն՝ կատարվում է առաջին անգամ), ապա functionToCreateContext-ը կանչվում է նոր համատեքստ ստեղծելու և DSstreams-ը կարգավորելու համար.
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Մենք ստեղծում ենք DirectStream օբյեկտ՝ «գործարքի» թեմային միանալու համար՝ օգտագործելով KafkaUtils գրադարանի createDirectStream մեթոդը.
Օգտագործելով Spark SQL, մենք կատարում ենք պարզ խմբավորում և արդյունքը ցուցադրում վահանակում.
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Ստանալով հարցման տեքստը և գործարկեք այն Spark SQL-ի միջոցով.
Եվ այնուհետև մենք պահպանում ենք ստացված ագրեգացված տվյալները AWS RDS-ի աղյուսակում: Ագրեգացիայի արդյունքները տվյալների բազայի աղյուսակում պահելու համար մենք կօգտագործենք DataFrame օբյեկտի գրելու մեթոդը.
Մի քանի խոսք AWS RDS-ի հետ կապ հաստատելու մասին: Մենք դրա համար օգտատերը և գաղտնաբառը ստեղծել ենք «AWS PostgreSQL-ի տեղակայում» քայլում: Դուք պետք է օգտագործեք Endpoint-ը որպես տվյալների բազայի սերվերի url, որը ցուցադրվում է Միացում և անվտանգություն բաժնում.
Spark-ը և Kafka-ն ճիշտ միացնելու համար դուք պետք է գործարկեք smark-submit-ի միջոցով՝ օգտագործելով արտեֆակտը: spark-streaming-kafka-0-8_2.11. Բացի այդ, մենք կօգտագործենք նաև արտեֆակտ PostgreSQL տվյալների բազայի հետ շփվելու համար, մենք դրանք կփոխանցենք --փաթեթների միջոցով:
Սկրիպտի ճկունության համար մենք որպես մուտքային պարամետրեր կներառենք նաև հաղորդագրությունների սերվերի անունը և թեման, որից ցանկանում ենք տվյալներ ստանալ։
Այսպիսով, ժամանակն է գործարկել և ստուգել համակարգի ֆունկցիոնալությունը.
Ամեն ինչ ստացվեց! Ինչպես տեսնում եք ստորև նկարում, մինչ հավելվածն աշխատում է, յուրաքանչյուր 2 վայրկյանը մեկ թողարկվում են համախմբման նոր արդյունքներ, քանի որ մենք StreamingContext օբյեկտը ստեղծելիս խմբաքանակի միջակայքը սահմանել ենք 2 վայրկյան.
Այնուհետև մենք պարզ հարցում ենք կատարում տվյալների բազայում՝ ստուգելու աղյուսակում գրառումների առկայությունը գործարքի_հոսք:
Ամփոփում
Այս հոդվածը դիտարկել է տեղեկատվության հոսքային մշակման օրինակ՝ օգտագործելով Spark Streaming-ը Apache Kafka-ի և PostgreSQL-ի հետ համատեղ: Տարբեր աղբյուրներից տվյալների աճի պայմաններում դժվար է գերագնահատել Spark Streaming-ի գործնական արժեքը հոսքային և իրական ժամանակում հավելվածներ ստեղծելու համար:
Դուք կարող եք գտնել ամբողջական աղբյուրի կոդը իմ պահոցում GitHub.
Ես ուրախ եմ քննարկելու այս հոդվածը, անհամբեր սպասում եմ ձեր մեկնաբանություններին, ինչպես նաև հույս ունեմ կառուցողական քննադատության բոլոր հոգատար ընթերցողների կողմից:
Ձեզ հաջողություն եմ ցանկանում!
Սաղ. Սկզբում նախատեսվում էր օգտագործել տեղական PostgreSQL տվյալների բազան, բայց հաշվի առնելով իմ սերը AWS-ի հանդեպ, ես որոշեցի տվյալների բազան տեղափոխել ամպ: Այս թեմայի վերաբերյալ հաջորդ հոդվածում ես ցույց կտամ, թե ինչպես իրականացնել AWS-ում վերը նկարագրված ամբողջ համակարգը՝ օգտագործելով AWS Kinesis և AWS EMR: Հետևե՛ք նորություններին։