Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Բարև, Հաբր: Այսօր մենք կկառուցենք մի համակարգ, որը կմշակի Apache Kafka-ի հաղորդագրությունների հոսքերը՝ օգտագործելով Spark Streaming-ը և կգրի մշակման արդյունքները AWS RDS ամպային տվյալների բազայում:

Պատկերացնենք, որ որոշակի վարկային հաստատություն մեզ խնդիր է դնում իր բոլոր մասնաճյուղերով մուտքային գործարքները «թռիչքով» մշակել: Դա կարող է արվել գանձապետարանի համար բաց արժութային դիրքի, գործարքների սահմանաչափերի կամ ֆինանսական արդյունքների և այլնի արագ հաշվարկման նպատակով:

Ինչպես իրականացնել այս դեպքը առանց կախարդական և կախարդական կախարդանքների օգտագործման - կարդացեք կտրվածքի տակ: Գնա՛

Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով
(Պատկերի աղբյուր)

Ներածություն

Իհարկե, իրական ժամանակում մեծ քանակությամբ տվյալների մշակումը լայն հնարավորություններ է տալիս ժամանակակից համակարգերում օգտագործելու համար: Դրա համար ամենահայտնի համակցություններից մեկը Apache Kafka-ի և Spark Streaming-ի տանդեմն է, որտեղ Կաֆկան ստեղծում է մուտքային հաղորդագրությունների փաթեթների հոսք, և Spark Streaming-ը մշակում է այդ փաթեթները որոշակի ժամանակային ընդմիջումով:

Հավելվածի սխալների հանդուրժողականությունը բարձրացնելու համար մենք կօգտագործենք անցակետեր: Այս մեխանիզմով, երբ Spark Streaming շարժիչը պետք է վերականգնի կորցրած տվյալները, այն պետք է միայն վերադառնա վերջին անցակետ և այնտեղից վերսկսի հաշվարկները:

Մշակված համակարգի ճարտարապետությունը

Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Օգտագործված բաղադրիչներ.

  • Apache Kafka բաշխված հրապարակում-բաժանորդագրվել հաղորդագրությունների համակարգ է: Հարմար է ինչպես օֆլայն, այնպես էլ առցանց հաղորդագրությունների սպառման համար: Տվյալների կորուստը կանխելու համար Կաֆկայի հաղորդագրությունները պահվում են սկավառակի վրա և կրկնօրինակվում կլաստերի ներսում: Kafka համակարգը կառուցված է ZooKeeper համաժամացման ծառայության վերևում;
  • Apache Spark Streaming - Spark բաղադրիչ հոսքային տվյալների մշակման համար: Spark Streaming մոդուլը կառուցված է միկրո խմբաքանակի ճարտարապետությամբ, որտեղ տվյալների հոսքը մեկնաբանվում է որպես փոքր տվյալների փաթեթների շարունակական հաջորդականություն: Spark Streaming-ը տվյալներ է վերցնում տարբեր աղբյուրներից և միավորում դրանք փոքր փաթեթների մեջ: Նոր փաթեթներ են ստեղծվում կանոնավոր պարբերականությամբ: Յուրաքանչյուր ժամանակային միջակայքի սկզբում ստեղծվում է նոր փաթեթ, և այդ ինտերվալի ընթացքում ստացված ցանկացած տվյալ ներառվում է փաթեթում: Ընդմիջման վերջում փաթեթների աճը դադարում է: Ինտերվալի չափը որոշվում է պարամետրով, որը կոչվում է խմբաքանակի միջակայք.
  • Apache Spark SQL - համատեղում է հարաբերական մշակումը Spark ֆունկցիոնալ ծրագրավորման հետ: Կառուցվածքային տվյալներ նշանակում են տվյալներ, որոնք ունեն սխեմա, այսինքն՝ դաշտերի միասնական հավաքածու բոլոր գրառումների համար: Spark SQL-ն աջակցում է տվյալների զանազան կառուցվածքային աղբյուրներից մուտքագրմանը և, սխեմայի տեղեկատվության առկայության շնորհիվ, այն կարող է արդյունավետ կերպով առբերել միայն գրառումների պահանջվող դաշտերը, ինչպես նաև տրամադրում է DataFrame API-ներ.
  • AWS RDS Համեմատաբար էժան ամպի վրա հիմնված հարաբերական տվյալների բազա է, վեբ ծառայություն, որը հեշտացնում է կարգավորումը, գործարկումը և մասշտաբը, և կառավարվում է անմիջապես Amazon-ի կողմից:

Կաֆկա սերվերի տեղադրում և գործարկում

Նախքան Kafka-ն ուղղակիորեն օգտագործելը, դուք պետք է համոզվեք, որ ունեք Java, քանի որ... JVM-ն օգտագործվում է աշխատանքի համար.

sudo apt-get update 
sudo apt-get install default-jre
java -version

Եկեք ստեղծենք նոր օգտվող՝ Կաֆկայի հետ աշխատելու համար.

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Հաջորդը, ներբեռնեք բաշխումը պաշտոնական Apache Kafka կայքից.

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Բացեք ներբեռնված արխիվը.

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Հաջորդ քայլը կամընտիր է: Փաստն այն է, որ լռելյայն կարգավորումները թույլ չեն տալիս լիովին օգտագործել Apache Kafka-ի բոլոր հնարավորությունները: Օրինակ՝ ջնջեք թեմա, կատեգորիա, խումբ, որին կարող են հրապարակվել հաղորդագրությունները: Սա փոխելու համար եկեք խմբագրենք կազմաձևման ֆայլը.

vim ~/kafka/config/server.properties

Ֆայլի վերջում ավելացրեք հետևյալը.

delete.topic.enable = true

Նախքան Kafka սերվերը գործարկելը, դուք պետք է գործարկեք ZooKeeper սերվերը; մենք կօգտագործենք օժանդակ սցենարը, որը գալիս է Kafka բաշխման հետ.

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper-ը հաջողությամբ սկսելուց հետո գործարկեք Kafka սերվերը առանձին տերմինալում.

bin/kafka-server-start.sh config/server.properties

Եկեք ստեղծենք նոր թեմա, որը կոչվում է Գործարք.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Եկեք համոզվենք, որ ստեղծվել է անհրաժեշտ քանակի միջնորմներով և կրկնօրինակմամբ թեմա.

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Բաց թողնենք արտադրողին ու սպառողին նորաստեղծ թեմայի փորձարկման պահերը։ Լրացուցիչ մանրամասներ այն մասին, թե ինչպես կարող եք փորձարկել հաղորդագրություններ ուղարկելը և ստանալը, գրված են պաշտոնական փաստաթղթերում. Ուղարկեք մի քանի հաղորդագրություն. Դե, մենք անցնում ենք Python-ում արտադրող գրելուն՝ օգտագործելով KafkaProducer API-ն:

Պրոդյուսեր գրավոր

Արտադրողը կստեղծի պատահական տվյալներ՝ ամեն վայրկյան 100 հաղորդագրություն: Պատահական տվյալներ ասելով հասկանում ենք բառարան, որը բաղկացած է երեք դաշտից.

  • Մասնաճյուղ - վարկային հաստատության վաճառքի կետի անվանումը.
  • Արտարժույթ - գործարքի արժույթ;
  • Քանակ - գործարքի գումարը. Գումարը կլինի դրական թիվ, եթե դա Բանկի կողմից արժույթի գնում է, և բացասական թիվ, եթե այն վաճառք է:

Արտադրողի կոդը այսպիսի տեսք ունի.

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Հաջորդը, օգտագործելով ուղարկելու մեթոդը, մենք հաղորդագրություն ենք ուղարկում սերվերին, մեզ անհրաժեշտ թեմային, JSON ձևաչափով.

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Սցենարը գործարկելիս տերմինալում ստանում ենք հետևյալ հաղորդագրությունները.

Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Սա նշանակում է, որ ամեն ինչ աշխատում է այնպես, ինչպես մենք էինք ուզում՝ պրոդյուսերը ստեղծում և հաղորդագրություններ է ուղարկում մեզ անհրաժեշտ թեմային:
Հաջորդ քայլը Spark-ի տեղադրումն ու այս հաղորդագրության հոսքի մշակումն է:

Apache Spark-ի տեղադրում

Apache Spark- ը ունիվերսալ և բարձր արդյունավետությամբ կլաստերային հաշվողական հարթակ է:

Spark-ը ավելի լավ է գործում, քան MapReduce մոդելի հանրաճանաչ իրականացումները՝ միաժամանակ աջակցելով հաշվարկման տեսակների ավելի լայն շրջանակին, ներառյալ ինտերակտիվ հարցումները և հոսքի մշակումը: Արագությունը կարևոր դեր է խաղում մեծ քանակությամբ տվյալներ մշակելիս, քանի որ հենց արագությունն է թույլ տալիս ինտերակտիվ աշխատել՝ առանց սպասելու րոպեներ կամ ժամեր ծախսելու: Spark-ի ամենամեծ ուժեղ կողմերից մեկը, որն այն դարձնում է այդքան արագ, հիշողության մեջ հաշվարկներ կատարելու նրա կարողությունն է:

Այս շրջանակը գրված է Scala-ում, այնպես որ նախ պետք է այն տեղադրել.

sudo apt-get install scala

Ներբեռնեք Spark բաշխումը պաշտոնական կայքից.

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Բացեք արխիվը.

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Ավելացնել Spark-ի ուղին bash ֆայլում՝

vim ~/.bashrc

Խմբագրի միջոցով ավելացրեք հետևյալ տողերը.

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Գործարկեք ստորև նշված հրամանը bashrc-ում փոփոխություններ կատարելուց հետո.

source ~/.bashrc

AWS PostgreSQL-ի տեղակայում

Մնում է միայն տեղակայել տվյալների բազան, որտեղ մենք կվերբեռնենք մշակված տեղեկատվությունը հոսքերից: Դրա համար մենք կօգտագործենք AWS RDS ծառայությունը:

Գնացեք AWS վահանակ -> AWS RDS -> Տվյալների բազաներ -> Ստեղծեք տվյալների բազա.
Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Ընտրեք PostgreSQL և սեղմեք Հաջորդը.
Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Որովհետեւ Այս օրինակը միայն կրթական նպատակների համար է, մենք կօգտագործենք անվճար սերվեր «առնվազն» (Free Tier):
Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Այնուհետև մենք նշում ենք «Free Tier» բլոկում, և դրանից հետո մեզ ավտոմատ կերպով կառաջարկվի t2.micro դասի օրինակ՝ չնայած թույլ, այն անվճար է և բավականին հարմար է մեր առաջադրանքի համար.
Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Հաջորդը գալիս են շատ կարևոր բաներ՝ տվյալների բազայի օրինակի անունը, վարպետ օգտագործողի անունը և նրա գաղտնաբառը: Անվանենք օրինակը՝ myHabrTest, վարպետ օգտվող. հաբր, գաղտնաբառը: habr12345 և սեղմեք Հաջորդ կոճակը.
Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Հաջորդ էջում կան պարամետրեր, որոնք պատասխանատու են դրսից մեր տվյալների բազայի սերվերի հասանելիության և պորտի հասանելիության համար.

Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Եկեք ստեղծենք նոր կարգավորում VPC անվտանգության խմբի համար, որը թույլ կտա արտաքին մուտք գործել մեր տվյալների բազայի սերվեր 5432 պորտի միջոցով (PostgreSQL):
Եկեք գնանք AWS վահանակ առանձին դիտարկիչի պատուհանում դեպի VPC Dashboard -> Security Groups -> Create Security Group բաժինը:
Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Մենք սահմանել ենք Անվտանգության խմբի անունը՝ PostgreSQL, նկարագրություն, նշելով, թե որ VPC-ի հետ պետք է կապված լինի այս խումբը և սեղմեք Ստեղծել կոճակը.
Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Լրացրեք 5432 պորտի ներգնա կանոնները նորաստեղծ խմբի համար, ինչպես ցույց է տրված ստորև նկարում: Դուք չեք կարող ձեռքով նշել նավահանգիստը, բայց բացվող «Type» ցանկից ընտրեք PostgreSQL:

Խստորեն ասած, արժեքը ::/0 նշանակում է մուտքային տրաֆիկի հասանելիություն դեպի սերվեր ամբողջ աշխարհից, ինչը կանոնականորեն լիովին ճիշտ չէ, բայց օրինակը վերլուծելու համար եկեք մեզ թույլ տանք օգտագործել այս մոտեցումը.
Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Մենք վերադառնում ենք զննարկիչի էջ, որտեղ մենք բացել ենք «Կարգավորել առաջադեմ կարգավորումները» և ընտրել VPC անվտանգության խմբեր բաժնում -> Ընտրել առկա VPC անվտանգության խմբերը -> PostgreSQL:
Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Հաջորդը, Տվյալների բազայի ընտրանքներում -> Տվյալների բազայի անունը -> սահմանեք անունը - habrDB.

Մենք կարող ենք լռելյայն թողնել մնացած պարամետրերը, բացառությամբ անջատելու պահուստավորումը (պահուստավորման ժամկետը՝ 0 օր), մոնիտորինգը և Performance Insights, լռելյայն: Սեղմեք կոճակի վրա Ստեղծել տվյալների բազա:
Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Թելերի կարգավորիչ

Վերջնական փուլը կլինի Spark-ի մշակումը, որը յուրաքանչյուր երկու վայրկյանը մեկ կմշակի Կաֆկայից ստացվող նոր տվյալները և արդյունքները մուտքագրելու է տվյալների բազա:

Ինչպես նշվեց վերևում, անցակետերը SparkStreaming-ի հիմնական մեխանիզմն են, որը պետք է կազմաձևվի՝ ապահովելու սխալների հանդուրժողականություն: Մենք կօգտագործենք անցակետեր, և եթե ընթացակարգը ձախողվի, Spark Streaming մոդուլը միայն պետք է վերադառնա վերջին անցակետ և վերսկսի հաշվարկները դրանից՝ կորցրած տվյալները վերականգնելու համար:

Checkpointing-ը կարող է միացված լինել՝ անսարքության հանդուրժող, հուսալի ֆայլային համակարգի (օրինակ՝ HDFS, S3 և այլն) տեղեկատու սահմանելով, որտեղ կպահվեն անցակետի տեղեկատվությունը: Դա արվում է օգտագործելով, օրինակ.

streamingContext.checkpoint(checkpointDirectory)

Մեր օրինակում մենք կօգտագործենք հետևյալ մոտեցումը, այն է՝ եթե checkpointDirectory գոյություն ունի, ապա համատեքստը կվերստեղծվի անցակետի տվյալներից: Եթե ​​գրացուցակը գոյություն չունի (այսինքն՝ կատարվում է առաջին անգամ), ապա functionToCreateContext-ը կանչվում է նոր համատեքստ ստեղծելու և DSstreams-ը կարգավորելու համար.

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Մենք ստեղծում ենք DirectStream օբյեկտ՝ «գործարքի» թեմային միանալու համար՝ օգտագործելով KafkaUtils գրադարանի createDirectStream մեթոդը.

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Մուտքային տվյալների վերլուծություն JSON ձևաչափով.

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Օգտագործելով Spark SQL, մենք կատարում ենք պարզ խմբավորում և արդյունքը ցուցադրում վահանակում.

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Ստանալով հարցման տեքստը և գործարկեք այն Spark SQL-ի միջոցով.

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Եվ այնուհետև մենք պահպանում ենք ստացված ագրեգացված տվյալները AWS RDS-ի աղյուսակում: Ագրեգացիայի արդյունքները տվյալների բազայի աղյուսակում պահելու համար մենք կօգտագործենք DataFrame օբյեկտի գրելու մեթոդը.

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Մի քանի խոսք AWS RDS-ի հետ կապ հաստատելու մասին: Մենք դրա համար օգտատերը և գաղտնաբառը ստեղծել ենք «AWS PostgreSQL-ի տեղակայում» քայլում: Դուք պետք է օգտագործեք Endpoint-ը որպես տվյալների բազայի սերվերի url, որը ցուցադրվում է Միացում և անվտանգություն բաժնում.

Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Spark-ը և Kafka-ն ճիշտ միացնելու համար դուք պետք է գործարկեք smark-submit-ի միջոցով՝ օգտագործելով արտեֆակտը: spark-streaming-kafka-0-8_2.11. Բացի այդ, մենք կօգտագործենք նաև արտեֆակտ PostgreSQL տվյալների բազայի հետ շփվելու համար, մենք դրանք կփոխանցենք --փաթեթների միջոցով:

Սկրիպտի ճկունության համար մենք որպես մուտքային պարամետրեր կներառենք նաև հաղորդագրությունների սերվերի անունը և թեման, որից ցանկանում ենք տվյալներ ստանալ։

Այսպիսով, ժամանակն է գործարկել և ստուգել համակարգի ֆունկցիոնալությունը.

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Ամեն ինչ ստացվեց! Ինչպես տեսնում եք ստորև նկարում, մինչ հավելվածն աշխատում է, յուրաքանչյուր 2 վայրկյանը մեկ թողարկվում են համախմբման նոր արդյունքներ, քանի որ մենք StreamingContext օբյեկտը ստեղծելիս խմբաքանակի միջակայքը սահմանել ենք 2 վայրկյան.

Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Այնուհետև մենք պարզ հարցում ենք կատարում տվյալների բազայում՝ ստուգելու աղյուսակում գրառումների առկայությունը գործարքի_հոսք:

Apache Kafka և հոսքային տվյալների մշակում Spark Streaming-ով

Ամփոփում

Այս հոդվածը դիտարկել է տեղեկատվության հոսքային մշակման օրինակ՝ օգտագործելով Spark Streaming-ը Apache Kafka-ի և PostgreSQL-ի հետ համատեղ: Տարբեր աղբյուրներից տվյալների աճի պայմաններում դժվար է գերագնահատել Spark Streaming-ի գործնական արժեքը հոսքային և իրական ժամանակում հավելվածներ ստեղծելու համար:

Դուք կարող եք գտնել ամբողջական աղբյուրի կոդը իմ պահոցում GitHub.

Ես ուրախ եմ քննարկելու այս հոդվածը, անհամբեր սպասում եմ ձեր մեկնաբանություններին, ինչպես նաև հույս ունեմ կառուցողական քննադատության բոլոր հոգատար ընթերցողների կողմից:

Ձեզ հաջողություն եմ ցանկանում!

Սաղ. Սկզբում նախատեսվում էր օգտագործել տեղական PostgreSQL տվյալների բազան, բայց հաշվի առնելով իմ սերը AWS-ի հանդեպ, ես որոշեցի տվյալների բազան տեղափոխել ամպ: Այս թեմայի վերաբերյալ հաջորդ հոդվածում ես ցույց կտամ, թե ինչպես իրականացնել AWS-ում վերը նկարագրված ամբողջ համակարգը՝ օգտագործելով AWS Kinesis և AWS EMR: Հետևե՛ք նորություններին։

Source: www.habr.com

Добавить комментарий