Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Talofa, Habr! O aso nei o le a matou fausia se faiga e faʻaogaina ai le Apache Kafka feʻau savali e faʻaaoga ai le Spark Streaming ma tusi ai faʻaiuga gaioiga i le AWS RDS cloud database.

Sei o tatou manatu o se faʻalapotopotoga faʻapitoa e tuʻuina mai ia i tatou le galuega o le faʻatinoina o fefaʻatauaiga o loʻo oʻo mai "i luga o le lele" i ona lala uma. E mafai ona faia lenei mea mo le faʻamoemoe o le vave faʻatatauina o se tupe avanoa avanoa mo le faleteuoloa, tapulaʻa poʻo taunuʻuga tau tupe mo fefaʻatauaiga, ma isi.

Faʻafefea ona faʻatinoina lenei mataupu e aunoa ma le faʻaogaina o togafiti faʻataulāitu ma faʻataulāitu - faitau i lalo o le tipi! Alu!

Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming
(Amataga ata)

Faatomuaga

O le mea moni, o le faʻatinoina o le tele o faʻamaumauga i le taimi moni e maua ai avanoa tele mo le faʻaogaina i faiga faʻaonaponei. O se tasi o tu'ufa'atasiga sili ona ta'uta'ua mo lenei mea o le tu'ufa'atasiga lea a Apache Kafka ma Spark Streaming, lea e fa'atupuina ai e Kafka se vaitafe o fa'ailoga fe'au o lo'o o'o mai, ma fa'agasolo e Spark Streaming ia pepa i se va'aiga taimi.

Ina ia faʻateleina le faʻapalepale sese o le talosaga, o le a matou faʻaogaina siaki. Faatasi ai ma lenei masini, pe a manaʻomia e le Spark Streaming engine le toe faʻaleleia o faʻamaumauga na leiloa, e manaʻomia ona toe foʻi i tua i le siaki mulimuli ma toe amata faʻatatau mai iina.

Fa'ata'ita'iga o faiga fa'avae

Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Vaega fa'aaogaina:

  • Apache Kafka ose faiga fa'asalalau fa'asalalau fa'asalalau. E fetaui lelei mo le faʻaogaina o feʻau tuusao ma luga ole laiga. Ina ia puipuia le leiloa o faʻamatalaga, o le Kafka savali o loʻo teuina i luga o le tisiki ma toe faia i totonu o le fuifui. O le polokalama Kafka e fausia i luga o le auaunaga faʻatasi a le ZooKeeper;
  • Apache Spark Streaming - Vaega Spark mo le fa'agaioia o fa'amaumauga. O le Spark Streaming module ua fausia i le faʻaogaina o le micro-batch architecture, lea o loʻo faʻamatalaina ai le faʻasologa o faʻamatalaga o se faʻasologa faifaipea o tamaʻi pepa faʻamatalaga. Spark Streaming e ave fa'amaumauga mai fa'apogai eseese ma tu'ufa'atasia i ni afifi laiti. O afifi fou e faia i taimi masani. I le amataga o taimi taʻitasi, e faia ai se pusa fou, ma soʻo se faʻamatalaga na maua i lena vaeluaga o loʻo aofia i totonu o le pusa. I le faaiuga o le vaeluaga, e taofi le tuputupu aʻe o pusa. O le tele o le va e fuafuaina e se parakalafa e taʻua o le va o le vaega;
  • Apache Spark SQL - tu'ufa'atasiga feso'ota'iga feso'ota'iga ma Spark polokalame fa'atino. O faʻamaumauga faʻatulagaina o lona uiga o faʻamaumauga o loʻo i ai se fuafuaga, o lona uiga, se seti o fanua mo faʻamaumauga uma. Spark SQL e lagolagoina le faʻaogaina mai le tele o faʻamaumauga faʻatulagaina faʻamaumauga ma, faʻafetai i le maua o faʻamatalaga schema, e mafai ona toe maua lelei naʻo vaega manaʻomia o faʻamaumauga, ma tuʻuina atu foi DataFrame API;
  • AWS RDS ose fa'amaumauga feso'ota'iga feso'ota'iga fa'avae ao taugofie, 'au'aunaga i luga ole laiga e fa'afaigofieina le fa'atulagaina, fa'agaioiga ma le fa'avasegaina, ma o lo'o fa'atautaia sa'o e Amazon.

Faʻapipiʻi ma faʻagaoioia le Kafka server

Aʻo leʻi faʻaogaina saʻo Kafka, e tatau ona e mautinoa o loʻo ia te oe Java, aua ... JVM faʻaaogaina mo galuega:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Sei o tatou fatuina se tagata fou e galulue ma Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Le isi, download le tufatufaina mai le upega tafaʻilagi aloaia a Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Tatala le fa'amaumauga na la'u mai:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

O le isi laasaga e filifili. O le mea moni o le faʻaogaina o tulaga e le faʻatagaina oe e faʻaaoga atoatoa foliga uma o Apache Kafka. Mo se faʻataʻitaʻiga, tape se autu, vaega, vaega e mafai ona faʻasalalau i ai feʻau. Ina ia suia lenei mea, seʻi o tatou faʻasaʻo le faila faila:

vim ~/kafka/config/server.properties

Faaopoopo mea nei i le pito o le faila:

delete.topic.enable = true

Aʻo leʻi amataina le Kafka server, e te manaʻomia le amataina o le ZooKeeper server; matou te faʻaogaina le tusitusiga fesoasoani e sau ma le tufatufaina o Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

A maeʻa ona amata manuia le ZooKeeper, faʻalauiloa le Kafka server i se isi laina:

bin/kafka-server-start.sh config/server.properties

Se'i o tatou faia se autu fou e ta'ua o Fefaatauaiga:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Sei o tatou mautinoa ua faia se autu ma le numera manaʻomia o vaeluaga ma le toe faia:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Sei o tatou misia taimi o le suʻeina o le gaosiga ma le tagata faʻatau mo le autu fou na faia. O nisi faʻamatalaga e uiga i le auala e mafai ai ona e suʻeina le lafoina ma le mauaina o feʻau o loʻo tusia i totonu o faʻamaumauga aloaia - Auina atu ni savali. Ia, matou te agai i luma i le tusiaina o se gaosiga i le Python e faʻaaoga ai le KafkaProducer API.

Tuuina atu tusitusiga

O le a gaosia e le tagata gaosi faʻamatalaga faʻafuaseʻi - 100 feʻau i sekone taʻitasi. I fa'amatalaga fa'afuase'i o lona uiga o se lomifefiloi e aofia ai matā'upu se tolu:

  • lālā - igoa ole nofoaga ole fa'atauga a le fa'alapotopotoga fa'aaitalafu;
  • tupe 'eseʻese - tupe fa'atau;
  • aofaʻiga - aofaiga o fefa'atauaiga. O le aofa'i o le numera lelei pe afai o se fa'atauga tupe a le Faletupe, ma le numera le lelei pe a fa'atau.

O le code mo le gaosiga e pei o lenei:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Le isi, faʻaaoga le auala lafo, matou te lafoina se feʻau i le 'auʻaunaga, i le autu matou te manaʻomia, i le JSON format:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Pe a fa'agasolo le fa'amaumauga, matou te mauaina fe'au nei i totonu o le fa'ailoga:

Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

O lona uiga e aoga mea uma e pei ona tatou manaʻo ai - o le gaosiga e gaosia ma auina atu feʻau i le autu tatou te manaʻomia.
O le isi laasaga o le faʻapipiʻi Spark ma faʻagasolo lenei feʻau savali.

Faʻapipiʻi Apache Spark

Apache Spark ose tulaga lautele ma maualuga-fa'atinoga fa'akomepiuta fa'apipi'i.

E sili atu le fa'atinoga o Spark nai lo fa'atinoga ta'uta'ua o le fa'ata'ita'iga a MapReduce a'o lagolagoina le tele o ituaiga fa'atusatusaga, e aofia ai fa'amatalaga fefa'asoaa'i ma le fa'agaioia o vaitafe. Ole saoasaoa e iai sona sao taua pe a faʻatautaia le tele o faʻamaumauga, talu ai o le saoasaoa e mafai ai ona e galue faʻatasi e aunoa ma le faʻaaluina o minute poʻo itula e faʻatali ai. O se tasi o malosiaga sili a Spark e fa'avavevave ai o lona tomai e fa'atino ai fa'atusatusaga i le mafaufau.

O lenei fa'avae ua tusia i le Scala, o lea e tatau ai ona e fa'apipi'i muamua:

sudo apt-get install scala

Tikia le Spark distribution mai le upega tafa'ilagi aloaia:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Tatala le fa'amaumauga:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Faʻaopoopo le ala ile Spark ile faila bash:

vim ~/.bashrc

Faaopoopo laina nei e ala i le faatonu:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Faʻatonu le faʻatonuga i lalo pe a uma ona fai suiga i le bashrc:

source ~/.bashrc

Fa'aogaina le AWS PostgreSQL

Pau lava le mea o loʻo totoe o le faʻapipiʻiina o le database lea o le a matou tuʻuina atu ai faʻamatalaga faʻatautaia mai vaitafe. Mo lenei mea o le a matou faʻaogaina le AWS RDS auaunaga.

Alu i le AWS console -> AWS RDS -> Databases -> Fausia faʻamaumauga:
Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Filifili PostgreSQL ma kiliki le Next:
Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Aua O lenei faʻataʻitaʻiga e mo naʻo faʻamoemoega faʻaleaʻoaʻoga; matou te faʻaogaina se 'auʻaunaga e leai se totogi "i lalo ifo" (Free Tier):
Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Ma le isi, matou te tuʻuina se siaki i le Free Tier poloka, ma a maeʻa, o le a otometi lava ona ofoina mai se faʻataʻitaʻiga o le vasega t2.micro - e ui ina vaivai, e leai se totogi ma e fetaui lelei mo la matou galuega:
Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

E soso'o mai mea taua tele: le igoa o le database fa'ata'ita'iga, le igoa o le matai fa'aoga ma lana fa'aupuga. Se'i ta'u le fa'ata'ita'iga: myHabrTest, matai fa'aoga: habr, numera e le iloa e sesi: habr12345 ma kiliki i le Next button:
Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

I luga o le itulau e sosoo ai o loʻo i ai faʻamaufaʻailoga e nafa ma le faʻaogaina o la matou 'auʻaunaga faʻamaumauga mai fafo (Faʻasalalau lautele) ma avanoa avanoa:

Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Sei o tatou faia se nofoaga fou mo le vaega saogalemu o le VPC, lea o le a mafai ai ona maua le avanoa i fafo i la tatou server database e ala i le port 5432 (PostgreSQL).
Tatou o atu i le AWS console i se isi faamalama suʻesuʻe i le VPC Dashboard -> Vaega Puipuiga -> Fausia vaega vaega saogalemu:
Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Matou te setiina le igoa mo le vaega Saogalemu - PostgreSQL, o se faʻamatalaga, faʻaalia po o le fea VPC lenei vaega e tatau ona fesoʻotaʻi ma kiliki le Fausia faʻamau:
Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Faatumu tulafono Inbound mo le taulaga 5432 mo le vaega fou na faia, e pei ona faʻaalia i le ata o loʻo i lalo. E le mafai ona e faʻamaonia le taulaga ma le lima, ae filifili le PostgreSQL mai le Type drop-down list.

O le tautala saʻo, o le tau :: / 0 o lona uiga o le maua o fefaʻatauaiga o loʻo oʻo mai i le 'auʻaunaga mai le lalolagi atoa, lea e le faʻamaoni atoatoa, ae ia iloilo le faʻataʻitaʻiga, seʻi o tatou faʻaogaina lenei auala:
Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Matou te toe foʻi i le itulau suʻesuʻe, lea o loʻo i ai le "Configure advanced settings" tatala ma filifili i le vaega o le saogalemu o le VPC -> Filifili vaega o le saogalemu o le VPC -> PostgreSQL:
Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Sosoo ai, i le Fa'amatalaga Fa'amatalaga -> Igoa fa'amaumauga -> seti le igoa - habrDB.

E mafai ona matou tu'ua vaega o lo'o totoe, se'i vagana ai le fa'aletonu o le fa'amaumauga (vaitaimi fa'amautu - 0 aso), mata'ituina ma Fa'amatalaga Fa'atinoga, e ala i le fa'aletonu. Kiliki i le faamau Fausia fa'amaumauga:
Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Fa'atonu filo

O le laasaga mulimuli o le atinaʻeina lea o se galuega Spark, lea o le a faʻagasolo ai faʻamatalaga fou e sau mai Kafka i lua sekone ma faʻapipiʻi le taunuuga i totonu o le database.

E pei ona taʻua i luga, o siaki o se masini autu i SparkStreaming e tatau ona faʻatulagaina e faʻamautinoa ai le faapalepale o sese. O le a matou faʻaogaina siaki ma, afai e le manuia le faʻatinoga, o le Spark Streaming module o le a naʻo le toe foʻi i le siaki mulimuli ma toe amata faʻatatau mai ai e toe maua ai faʻamatalaga leiloa.

E mafai ona fa'agaoioi le siaki e ala i le fa'atulagaina o se lisi i luga o se fa'aletonu, fa'alagolago i faila faila (pei o le HDFS, S3, ma isi) lea o le a teu ai fa'amatalaga siaki. E faia lenei mea e faʻaaoga ai, mo se faʻataʻitaʻiga:

streamingContext.checkpoint(checkpointDirectory)

I la matou faʻataʻitaʻiga, o le a matou faʻaogaina le auala o loʻo i lalo, o lona uiga, afai o loʻo i ai le checkpointDirectory, ona toe faia lea o le tala mai faʻamaumauga siaki. Afai e le o iai le lisi (e pei o le faʻatinoina mo le taimi muamua), ona valaʻau lea o le functionToCreateContext e fatu ai se tala fou ma faʻapipiʻi DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Matou te fatuina se mea DirectStream e faʻafesoʻotaʻi i le "fefaʻatauaiga" autu e faʻaaoga ai le auala createDirectStream o le faletusi KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Fa'avasegaina fa'amatalaga o lo'o o'o mai ile fa'asologa o le JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Faʻaaogaina Spark SQL, matou te faia se faʻavasegaga faigofie ma faʻaalia le iʻuga i le faʻamafanafanaga:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Maua le fa'amatalaga fesili ma fa'agasolo ile Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Ona matou faʻasaoina lea o faʻamaumauga tuʻufaʻatasia i totonu o se laulau i le AWS RDS. Ina ia faʻasaoina faʻamaumauga tuʻufaʻatasia i se laulau faʻamaumauga, matou te faʻaogaina le auala tusitusi o le mea DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

O nai upu e uiga i le setiina o se sootaga i le AWS RDS. Na matou fatuina le tagata faʻaoga ma le upega tafaʻilagi mo ia i le "Deploying AWS PostgreSQL" laasaga. E tatau ona e faʻaogaina le Endpoint e avea ma url server database, lea e faʻaalia i le Connectivity & security section:

Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Ina ia mafai ona faʻafesoʻotaʻi saʻo Spark ma Kafka, e tatau ona e faʻatinoina le galuega e ala i le smark-submit e faʻaaoga ai le meafaitino. spark-streaming-kafka-0-8_2.11. E le gata i lea, o le a matou faʻaogaina foi se mea faʻapitoa mo le fegalegaleai ma le PostgreSQL database; matou te faʻafeiloaʻi i latou e ala i --packages.

Mo le fetuutuunai o le tusitusiga, o le a matou aofia ai foi e avea ma faʻamaufaʻailoga faʻapipiʻi le igoa o le feʻau feʻau ma le autu matou te fia maua ai faʻamatalaga.

O lea la, ua oʻo i le taimi e faʻalauiloa ma siaki le faʻaogaina o le faiga:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Na manuia mea uma! E pei ona mafai ona e vaʻai i le ata o loʻo i lalo, aʻo faʻagasolo le talosaga, o faʻasalalauga fou e maua i le 2 sekone, aua matou te setiina le vaeluaga i le 2 sekone pe a matou fatuina le mea StreamingContext:

Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

Le isi, matou te faia se fesili faigofie i le database e siaki ai le i ai o faʻamaumauga i le laulau transaction_flow:

Apache Kafka ma Fa'asalalau Fa'amatalaga Fa'asologa ma Spark Streaming

iʻuga

O lenei tusiga na tilotilo i se faʻataʻitaʻiga o le faʻaogaina o faʻamatalaga e faʻaaoga ai le Spark Streaming faʻatasi ma Apache Kafka ma PostgreSQL. Faatasi ai ma le tuputupu aʻe o faʻamaumauga mai faʻamatalaga eseese, e faigata ona faʻamaualuga le aoga aoga o Spark Streaming mo le fatuina o faʻasalalauga ma faʻaoga taimi moni.

E mafai ona e mauaina le code source atoa i laʻu fale teu oloa i GitHub.

Ou te fiafia e talanoaina lenei tusiga, ou te tulimatai atu i au faʻamatalaga, ma ou te faʻamoemoe foi mo faitioga aoga mai le au faitau alofa uma.

Ou te moomoo ia manuia oe!

Sala. I le taimi muamua na fuafua e faʻaoga se PostgreSQL database i le lotoifale, ae ona o loʻu alofa mo le AWS, na ou filifili ai e faʻanofo le database i le ao. I le isi tusiga i luga o lenei autu, o le a ou faʻaalia le faʻaogaina o le faiga atoa o loʻo faʻamatalaina i luga i le AWS e faʻaaoga ai le AWS Kinesis ma le AWS EMR. Mulimuli i tala fou!

puna: www.habr.com

Faaopoopo i ai se faamatalaga