Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Habari, Habr! Leo tutaunda mfumo ambao utachakata mitiririko ya ujumbe wa Apache Kafka kwa kutumia Spark Streaming na kuandika matokeo ya usindikaji kwenye hifadhidata ya wingu ya AWS RDS.

Hebu fikiria kwamba taasisi fulani ya mikopo inatuwekea kazi ya kushughulikia miamala inayoingia "kwa kuruka" kwenye matawi yake yote. Hili linaweza kufanywa kwa madhumuni ya kukokotoa mara moja nafasi ya wazi ya fedha kwa hazina, vikomo au matokeo ya kifedha ya miamala, n.k.

Jinsi ya kutekeleza kesi hii bila matumizi ya uchawi na uchawi wa uchawi - soma chini ya kukata! Nenda!

Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark
(Chanzo cha picha)

Utangulizi

Bila shaka, usindikaji wa kiasi kikubwa cha data kwa wakati halisi hutoa fursa nyingi za matumizi katika mifumo ya kisasa. Mojawapo ya mchanganyiko maarufu zaidi wa hii ni tandem ya Apache Kafka na Spark Streaming, ambapo Kafka huunda mtiririko wa pakiti za ujumbe zinazoingia, na Spark Streaming huchakata pakiti hizi kwa muda fulani.

Ili kuongeza uvumilivu wa makosa ya programu, tutatumia vituo vya ukaguzi. Kwa utaratibu huu, wakati injini ya Spark Streaming inahitaji kurejesha data iliyopotea, inahitaji tu kurudi kwenye kituo cha ukaguzi cha mwisho na kuendelea na mahesabu kutoka hapo.

Usanifu wa mfumo ulioendelezwa

Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Vipengele vilivyotumika:

  • Apache Kafka ni mfumo wa utumaji ujumbe unaosambazwa wa uchapishaji unaofuata. Inafaa kwa matumizi ya ujumbe nje ya mtandao na mtandaoni. Ili kuzuia upotezaji wa data, ujumbe wa Kafka huhifadhiwa kwenye diski na kunakiliwa ndani ya nguzo. Mfumo wa Kafka umejengwa juu ya huduma ya ulandanishi ya ZooKeeper;
  • Utiririshaji wa Apache Spark - Sehemu ya Spark kwa usindikaji wa data ya utiririshaji. Moduli ya Utiririshaji wa Spark imeundwa kwa kutumia usanifu wa bechi ndogo, ambapo mtiririko wa data unafasiriwa kama mlolongo unaoendelea wa pakiti ndogo za data. Utiririshaji wa Spark huchukua data kutoka kwa vyanzo tofauti na kuichanganya kuwa vifurushi vidogo. Vifurushi vipya vinaundwa kwa vipindi vya kawaida. Mwanzoni mwa kila muda, pakiti mpya huundwa, na data yoyote iliyopokelewa wakati wa muda huo imejumuishwa kwenye pakiti. Mwishoni mwa muda, ukuaji wa pakiti huacha. Ukubwa wa muda unatambuliwa na parameter inayoitwa muda wa kundi;
  • SQL ya Apache Spark - inachanganya usindikaji wa uhusiano na programu ya kazi ya Spark. Data iliyopangwa inamaanisha data ambayo ina schema, yaani, seti moja ya sehemu za rekodi zote. Spark SQL inasaidia pembejeo kutoka kwa vyanzo mbalimbali vya data vilivyoundwa na, kutokana na upatikanaji wa taarifa za schema, inaweza kurejesha kwa ufanisi sehemu zinazohitajika za rekodi, na pia hutoa API za DataFrame;
  • AWS RDS ni hifadhidata ya uhusiano ya bei nafuu inayotegemea wingu, huduma ya tovuti ambayo hurahisisha usanidi, uendeshaji na kuongeza, na inasimamiwa moja kwa moja na Amazon.

Kufunga na kuendesha seva ya Kafka

Kabla ya kutumia Kafka moja kwa moja, unahitaji kuhakikisha kuwa una Java, kwa sababu... JVM inatumika kwa kazi:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Wacha tuunde mtumiaji mpya wa kufanya kazi na Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Ifuatayo, pakua usambazaji kutoka kwa wavuti rasmi ya Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Fungua kumbukumbu iliyopakuliwa:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Hatua inayofuata ni ya hiari. Ukweli ni kwamba mipangilio ya chaguo-msingi haikuruhusu kutumia kikamilifu uwezo wote wa Apache Kafka. Kwa mfano, futa mada, kategoria, kikundi ambacho ujumbe unaweza kuchapishwa. Ili kubadilisha hii, hebu tuhariri faili ya usanidi:

vim ~/kafka/config/server.properties

Ongeza yafuatayo hadi mwisho wa faili:

delete.topic.enable = true

Kabla ya kuanza seva ya Kafka, unahitaji kuanzisha seva ya ZooKeeper; tutatumia hati ya msaidizi inayokuja na usambazaji wa Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Baada ya ZooKeeper kuanza kwa mafanikio, zindua seva ya Kafka kwenye terminal tofauti:

bin/kafka-server-start.sh config/server.properties

Wacha tuunde mada mpya inayoitwa Muamala:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Wacha tuhakikishe kuwa mada iliyo na nambari inayohitajika ya sehemu na urudufishaji imeundwa:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Hebu tukose matukio ya kujaribu mzalishaji na mtumiaji kwa mada mpya iliyoundwa. Maelezo zaidi kuhusu jinsi unavyoweza kujaribu kutuma na kupokea ujumbe yameandikwa katika nyaraka rasmi - Tuma baadhi ya ujumbe. Kweli, tunaendelea kuandika mtayarishaji katika Python kwa kutumia API ya KafkaProducer.

Kuandika kwa mtayarishaji

Mtayarishaji atatoa data nasibu - ujumbe 100 kila sekunde. Kwa data nasibu tunamaanisha kamusi inayojumuisha sehemu tatu:

  • Tawi - jina la kituo cha uuzaji cha taasisi ya mkopo;
  • Sarafu - sarafu ya manunuzi;
  • kiasi - kiasi cha manunuzi. Kiasi hicho kitakuwa nambari chanya ikiwa ni ununuzi wa sarafu na Benki, na nambari hasi ikiwa ni mauzo.

Nambari ya mtayarishaji inaonekana kama hii:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Ifuatayo, kwa kutumia njia ya kutuma, tunatuma ujumbe kwa seva, kwa mada tunayohitaji, katika umbizo la JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Wakati wa kuendesha hati, tunapokea ujumbe ufuatao kwenye terminal:

Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Hii inamaanisha kuwa kila kitu hufanya kazi kama tulivyotaka - mtayarishaji hutoa na kutuma ujumbe kwa mada tunayohitaji.
Hatua inayofuata ni kusakinisha Spark na kuchakata mtiririko huu wa ujumbe.

Kufunga Apache Spark

Apache Spark ni jukwaa la jumla na la utendaji wa juu la kompyuta la nguzo.

Spark hufanya kazi vyema zaidi kuliko utekelezaji maarufu wa muundo wa MapReduce huku ikisaidia aina mbalimbali za ukokotoaji, ikiwa ni pamoja na hoja wasilianifu na uchakataji wa mtiririko. Kasi ina jukumu muhimu wakati wa kusindika idadi kubwa ya data, kwani ni kasi ambayo hukuruhusu kufanya kazi kwa maingiliano bila kutumia dakika au masaa kungojea. Mojawapo ya uwezo mkubwa wa Spark unaoifanya iwe haraka sana ni uwezo wake wa kufanya hesabu za kumbukumbu.

Mfumo huu umeandikwa katika Scala, kwa hivyo unahitaji kuisanikisha kwanza:

sudo apt-get install scala

Pakua usambazaji wa Spark kutoka kwa tovuti rasmi:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Fungua kumbukumbu:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Ongeza njia ya Spark kwenye faili ya bash:

vim ~/.bashrc

Ongeza mistari ifuatayo kupitia kihariri:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Tekeleza amri hapa chini baada ya kufanya mabadiliko kwa bashrc:

source ~/.bashrc

Inapeleka AWS PostgreSQL

Kilichosalia ni kupeleka hifadhidata ambamo tutapakia habari iliyochakatwa kutoka kwa mitiririko. Kwa hili tutatumia huduma ya AWS RDS.

Nenda kwa koni ya AWS -> AWS RDS -> Hifadhidata -> Unda hifadhidata:
Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Chagua PostgreSQL na ubonyeze Ifuatayo:
Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Kwa sababu Mfano huu ni kwa madhumuni ya kielimu pekee; tutatumia seva isiyolipishwa "kwa kiwango cha chini" (Kiwango cha Bure):
Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Ifuatayo, tunaweka tiki kwenye Kizuizi cha Free Tier, na baada ya hapo tutapewa kiotomatiki mfano wa darasa la t2.micro - ingawa ni dhaifu, ni la bure na linafaa kabisa kwa kazi yetu:
Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Kisha huja mambo muhimu sana: jina la mfano wa hifadhidata, jina la mtumiaji mkuu na nenosiri lake. Wacha tupe mfano: myHabrTest, mtumiaji mkuu: habri, nenosiri: habr12345 na ubonyeze kitufe kifuatacho:
Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Katika ukurasa unaofuata kuna vigezo vinavyohusika na ufikivu wa seva yetu ya hifadhidata kutoka nje (ufikivu wa Umma) na upatikanaji wa mlango:

Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Hebu tuunde mpangilio mpya wa kikundi cha usalama cha VPC, ambacho kitaruhusu ufikiaji wa nje kwa seva yetu ya hifadhidata kupitia bandari 5432 (PostgreSQL).
Wacha tuende kwenye koni ya AWS kwenye kidirisha tofauti cha kivinjari kwa Dashibodi ya VPC -> Vikundi vya Usalama -> Unda sehemu ya kikundi cha usalama:
Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Tunaweka jina la kikundi cha Usalama - PostgreSQL, maelezo, yanaonyesha ni VPC gani kikundi hiki kinapaswa kuhusishwa nacho na ubofye kitufe cha Unda:
Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Jaza sheria zinazoingia za bandari 5432 za kikundi kipya, kama inavyoonekana kwenye picha hapa chini. Huwezi kubainisha bandari wewe mwenyewe, lakini chagua PostgreSQL kutoka kwenye orodha kunjuzi ya Aina.

Kusema kweli, thamani ::/0 inamaanisha upatikanaji wa trafiki inayoingia kwa seva kutoka duniani kote, ambayo si kweli kabisa, lakini ili kuchanganua mfano, hebu tujiruhusu kutumia mbinu hii:
Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Tunarudi kwenye ukurasa wa kivinjari, ambapo tuna "Sanidi mipangilio ya kina" iliyofunguliwa na uchague katika sehemu ya vikundi vya usalama vya VPC -> Chagua vikundi vya usalama vya VPC vilivyopo -> PostgreSQL:
Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Ifuatayo, katika chaguzi za Hifadhidata -> Jina la Hifadhidata -> weka jina - habrDB.

Tunaweza kuacha vigezo vilivyosalia, isipokuwa kuzima kipengele cha kuhifadhi nakala (kipindi cha kuhifadhi nakala - siku 0), ufuatiliaji na Maarifa ya Utendaji, kwa chaguo-msingi. Bofya kwenye kifungo Unda hifadhidata:
Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Kidhibiti cha nyuzi

Hatua ya mwisho itakuwa ukuzaji wa kazi ya Spark, ambayo itashughulikia data mpya kutoka kwa Kafka kila sekunde mbili na kuingiza matokeo kwenye hifadhidata.

Kama ilivyoonyeshwa hapo juu, vituo vya ukaguzi ni njia kuu katika SparkStreaming ambayo lazima isanidiwe ili kuhakikisha uvumilivu wa makosa. Tutatumia vituo vya ukaguzi na, ikiwa utaratibu hautafaulu, moduli ya Spark Streaming itahitaji tu kurudi kwenye kituo cha ukaguzi cha mwisho na kuanza tena mahesabu kutoka kwayo ili kurejesha data iliyopotea.

Ukaguzi unaweza kuwashwa kwa kuweka saraka kwenye mfumo wa faili unaostahimili hitilafu, unaotegemewa (kama vile HDFS, S3, n.k.) ambamo maelezo ya kituo cha ukaguzi yatahifadhiwa. Hii inafanywa kwa kutumia, kwa mfano:

streamingContext.checkpoint(checkpointDirectory)

Katika mfano wetu, tutatumia mbinu ifuatayo, yaani, ikiwa checkpointDirectory ipo, basi muktadha utaundwa upya kutoka kwa data ya ukaguzi. Ikiwa saraka haipo (yaani imetekelezwa kwa mara ya kwanza), basi functionToCreateContext inaitwa kuunda muktadha mpya na kusanidi DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Tunaunda kipengee cha DirectStream ili kuungana na mada ya "muamala" kwa kutumia njia ya kuundaDirectStream ya maktaba ya KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Kuchanganua data inayoingia katika umbizo la JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Kutumia Spark SQL, tunafanya kikundi rahisi na kuonyesha matokeo kwenye koni:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Kupata maandishi ya hoja na kuiendesha kupitia Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Na kisha tunahifadhi data iliyojumlishwa katika jedwali katika AWS RDS. Ili kuhifadhi matokeo ya ujumlisho kwenye jedwali la hifadhidata, tutatumia njia ya kuandika ya kitu cha DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Maneno machache kuhusu kusanidi muunganisho kwa AWS RDS. Tuliunda mtumiaji na nenosiri lake katika hatua ya "Kutumia AWS PostgreSQL". Unapaswa kutumia Endpoint kama url ya seva ya hifadhidata, ambayo inaonyeshwa katika sehemu ya Muunganisho na usalama:

Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Ili kuunganisha kwa usahihi Spark na Kafka, unapaswa kuendesha kazi kupitia smark-submit kwa kutumia kisanii. utiririshaji-cheche-kafka-0-8_2.11. Zaidi ya hayo, tutatumia vizalia vya programu kwa kuingiliana na hifadhidata ya PostgreSQL; tutazihamisha kupitia --packages.

Kwa unyumbufu wa hati, tutajumuisha pia kama vigezo vya kuingiza jina la seva ya ujumbe na mada ambayo tunataka kupokea data.

Kwa hivyo, ni wakati wa kuzindua na kuangalia utendaji wa mfumo:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Kila kitu kilifanyika! Kama unavyoona kwenye picha hapa chini, wakati programu inaendeshwa, matokeo mapya ya ujumlisho hutolewa kila baada ya sekunde 2, kwa sababu tuliweka muda wa kubandika hadi sekunde 2 tulipounda kipengee cha StreamingContext:

Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Ifuatayo, tunafanya swala rahisi kwa hifadhidata ili kuangalia uwepo wa rekodi kwenye jedwali mtiririko_wa_muamala:

Apache Kafka na Usindikaji wa Data wa Utiririshaji na Utiririshaji wa Spark

Hitimisho

Nakala hii iliangalia mfano wa usindikaji wa habari kwa kutumia Spark Streaming kwa kushirikiana na Apache Kafka na PostgreSQL. Pamoja na ukuaji wa data kutoka kwa vyanzo mbalimbali, ni vigumu kukadiria thamani ya vitendo ya Utiririshaji wa Spark kwa kuunda utiririshaji na programu za wakati halisi.

Unaweza kupata msimbo kamili wa chanzo kwenye hazina yangu kwa GitHub.

Nimefurahiya kujadili nakala hii, natarajia maoni yako, na pia natumai ukosoaji mzuri kutoka kwa wasomaji wote wanaojali.

Nakutakia mafanikio!

Zab. Hapo awali ilipangwa kutumia hifadhidata ya ndani ya PostgreSQL, lakini kwa kuzingatia upendo wangu kwa AWS, niliamua kuhamisha hifadhidata hadi kwa wingu. Katika makala inayofuata juu ya mada hii, nitaonyesha jinsi ya kutekeleza mfumo mzima ulioelezwa hapo juu katika AWS kwa kutumia AWS Kinesis na AWS EMR. Fuata habari!

Chanzo: mapenzi.com

Kuongeza maoni