Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Salama, Habr! Androany dia hanangana rafitra iray izay handrindra ny hafatra Apache Kafka amin'ny alàlan'ny Spark Streaming isika ary hanoratra ny valin'ny fanodinana amin'ny angona rahona AWS RDS.

Alao sary an-tsaina fa ny andrim-panjakana iray amin'ny fampindramam-bola iray dia mametraka ny andraikitra amin'ny fanodinana ny fifampiraharahana ho avy "amin'ny sidina" manerana ny sampana rehetra. Izany dia azo atao amin'ny tanjona kajy haingana ny toeran'ny vola misokatra ho an'ny tahirim-bola, fetra na vokatra ara-bola ho an'ny fifanakalozana, sns.

Ahoana ny fampiharana ity raharaha ity raha tsy mampiasa ody sy ody - vakio eo ambanin'ny tapaka! Mandehana!

Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming
(Loharano sary)

fampidirana

Mazava ho azy, ny fanodinana angon-drakitra be dia be amin'ny fotoana tena izy dia manome fahafahana betsaka hampiasaina amin'ny rafitra maoderina. Ny iray amin'ireo fampifangaroana malaza indrindra amin'izany dia ny tandem an'ny Apache Kafka sy Spark Streaming, izay i Kafka dia mamorona andiana fonosana hafatra ho avy, ary ny Spark Streaming dia manodina ireo fonosana ireo amin'ny fotoana voafaritra.

Mba hampitomboana ny fandeferana diso amin'ny fampiharana dia hampiasa toeram-pisavana izahay. Miaraka amin'io mekanika io, rehefa mila mamerina ny angon-drakitra very ny motera Spark Streaming, dia mila miverina any amin'ny toeram-pisavana farany fotsiny izy ary manohy ny kajy avy eo.

Architecture ny rafitra novolavolaina

Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Ireo singa ampiasaina:

  • Apache Kafka dia rafitra fandefasana hafatra famoaham-baovao misoratra anarana. Mety amin'ny fampiasana hafatra an-tserasera sy an-tserasera. Mba hisorohana ny fahaverezan'ny angona dia voatahiry ao anaty kapila ny hafatra Kafka ary averina ao anaty cluster. Ny rafitra Kafka dia naorina eo an-tampon'ny serivisy fampifanarahana ZooKeeper;
  • Apache Spark Streaming - singa Spark ho an'ny fanodinana angon-drakitra mivantana. Ny maody Spark Streaming dia natsangana tamin'ny alàlan'ny maritrano micro-batch, izay adika ho toy ny filaharana mitohy amin'ny fonosana data kely ny stream data. Ny Spark Streaming dia maka angona avy amin'ny loharano samihafa ary manambatra azy ho fonosana kely. Ny fonosana vaovao dia noforonina amin'ny fotoana tsy tapaka. Eo am-piandohan'ny elanelam-potoana tsirairay, dia misy fonosana vaovao noforonina, ary izay angona voaray nandritra io elanelam-potoana io dia tafiditra ao anatin'ilay fonosana. Amin'ny faran'ny elanelam-potoana dia mijanona ny fitomboan'ny fonosana. Ny haben'ny elanelam-potoana dia faritana amin'ny alàlan'ny mari-pamantarana iray antsoina hoe elanelana batch;
  • Apache Spark SQL - manambatra ny fanodinana fifandraisana amin'ny Spark fandaharana miasa. Ny angon-drakitra voarafitra dia midika hoe angona manana schema, izany hoe, sehatra tokana ho an'ny rakitra rehetra. Ny Spark SQL dia manohana ny fampidirana avy amin'ny loharanom-baovao voarafitra isan-karazany ary, noho ny fisian'ny fampahafantarana momba ny skema, dia afaka maka amin'ny fomba mahomby ihany ny saha ilaina amin'ny firaketana, ary manome API DataFrame ihany koa;
  • AWS RDS dia angon-drakitra fifandraisana mifototra amin'ny rahona mora vidy, serivisy tranonkala izay manamora ny fametrahana, ny fampandehanana ary ny fanamafisam-peo, ary tantanan'i Amazon mivantana.

Fametrahana sy fampandehanana ny mpizara Kafka

Alohan'ny hampiasana mivantana an'i Kafka dia mila mahazo antoka ianao fa manana Java, satria ... JVM dia ampiasaina amin'ny asa:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Andao hamorona mpampiasa vaovao hiara-hiasa amin'i Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Avy eo, alao ny fizarana avy amin'ny tranokala ofisialy Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Esory ny rakitra alaina:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Tsy voatery ny dingana manaraka. Ny zava-misy dia tsy mamela anao hampiasa tanteraka ny fahaiza-manaon'ny Apache Kafka ny fametrahana default. Ohatra, fafao lohahevitra, sokajy, vondrona izay ahafahana mamoaka hafatra. Raha hanova izany dia andao hanova ny fisie fanamafisana:

vim ~/kafka/config/server.properties

Ampio amin'ny faran'ny rakitra ity manaraka ity:

delete.topic.enable = true

Alohan'ny hanombohana ny mpizara Kafka dia mila manomboka ny mpizara ZooKeeper ianao; hampiasa ny script fanampiny miaraka amin'ny fizarana Kafka izahay:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Rehefa nanomboka soa aman-tsara ny ZooKeeper dia atombohy amin'ny terminal mitokana ny mpizara Kafka:

bin/kafka-server-start.sh config/server.properties

Andao hamorona lohahevitra vaovao antsoina hoe Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Andao ho azo antoka fa nisy lohahevitra iray misy ny isa ilaina amin'ny fizarazarana sy ny famerenana:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Andeha hojerentsika ny fotoana hitsapana ny mpamokatra sy ny mpanjifa amin'ny lohahevitra vao noforonina. Ny antsipiriany bebe kokoa momba ny fomba ahafahanao manandrana mandefa sy mandray hafatra dia voasoratra ao amin'ny antontan-taratasy ofisialy - Mandefasa hafatra sasany. Eny, mandroso amin'ny fanoratana mpamokatra amin'ny Python mampiasa ny KafkaProducer API izahay.

Manoratra ny mpamokatra

Ny mpamokatra dia hamokatra angona kisendrasendra - hafatra 100 isa-tsegondra. Amin'ny angon-drakitra kisendrasendra dia midika hoe rakibolana misy saha telo:

  • sampana - anaran'ny toeram-pivarotana ny andrim-panjakana findramam-bola;
  • Sandam-bola - vola amin'ny fifanakalozana;
  • vola - volan'ny fifampiraharahana. Ny vola dia isa tsara raha mividy vola amin'ny Banky, ary isa ratsy raha fivarotana.

Toy izao ny code ho an'ny mpamokatra:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Manaraka, mampiasa ny fomba fandefasana, mandefa hafatra amin'ny mpizara izahay, amin'ny lohahevitra ilaintsika, amin'ny endrika JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Rehefa mihazakazaka ny script dia mahazo ireto hafatra manaraka ireto ao amin'ny terminal izahay:

Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Midika izany fa mandeha araka izay irintsika ny zava-drehetra - mamokatra sy mandefa hafatra amin'ny lohahevitra ilaintsika ny mpamokatra.
Ny dingana manaraka dia ny fametrahana Spark sy ny fanodinana ity hafatra ity.

Fametrahana Apache Spark

Apache Spark dia sehatra informatika kluster manerantany sy mahomby.

Mandeha tsara kokoa noho ny fampiharana malaza amin'ny maodely MapReduce ny Spark sady manohana karazana kajy midadasika kokoa, ao anatin'izany ny fanontaniana mifampiresaka sy ny fanodinana stream. Ny hafainganam-pandeha dia mitana anjara toerana lehibe amin'ny fanodinana angon-drakitra be dia be, satria ny hafainganam-pandeha dia ahafahanao miasa amin'ny interactive nefa tsy mandany minitra na ora miandry. Ny iray amin'ireo tanjaky ny Spark lehibe indrindra mahatonga azy haingana dia ny fahaizany manao kajy ao anaty fitadidiana.

Ity rafitra ity dia voasoratra amin'ny Scala, ka mila mametraka azy aloha ianao:

sudo apt-get install scala

Ampidino ny fizarana Spark amin'ny tranokala ofisialy:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Esory ny rakitra:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Ampio ny lalana mankany Spark amin'ny rakitra bash:

vim ~/.bashrc

Ampio ireto andalana manaraka ireto amin'ny alàlan'ny mpamoaka lahatsoratra:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Alefaso ny baiko etsy ambany rehefa avy nanao fanovana amin'ny bashrc:

source ~/.bashrc

Mampiasa AWS PostgreSQL

Ny hany sisa tavela dia ny fametrahana ny angon-drakitra izay hampidiranay ny vaovao voarindra avy amin'ny renirano. Amin'izany dia hampiasa ny serivisy AWS RDS izahay.

Mandehana any amin'ny console AWS -> AWS RDS -> Databases -> Mamorona tahiry:
Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Safidio ny PostgreSQL ary tsindrio Next:
Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

SATRIA Ity ohatra ity dia natao ho an'ny tanjona fanabeazana ihany; hampiasa mpizara maimaim-poana izahay "farafaharatsiny" (Tier Free):
Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Manaraka izany dia asiana marika ao amin'ny sakana Free Tier, ary aorian'izay dia hatolotra ho azy ny ohatra iray amin'ny kilasy t2.micro - na dia malemy aza, dia maimaim-poana ary mety amin'ny asantsika:
Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Avy eo dia tonga ny zavatra tena manan-danja: ny anaran'ny ohatra amin'ny database, ny anaran'ny mpampiasa master sy ny tenimiafina. Andeha hotononintsika ny ohatra: myHabrTest, mpampiasa master: habr, tenimiafina: habr12345 ary tsindrio ny bokotra Next:
Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Eo amin'ny pejy manaraka dia misy masontsivana tompon'andraikitra amin'ny fidiran'ny mpizara angona avy any ivelany (Accessibilité public) sy ny fisian'ny seranan-tsambo:

Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Andao hamorona sehatra vaovao ho an'ny vondrona fiarovana VPC, izay ahafahan'ny fidirana ivelany amin'ny lohamilina angona amin'ny alàlan'ny seranan-tsambo 5432 (PostgreSQL).
Andao ho any amin'ny console AWS amin'ny varavarankely navigateur mitokana mankany amin'ny Dashboard VPC -> Vondrona fiarovana -> Mamorona fizarana vondrona fiarovana:
Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Nametraka anarana ho an'ny vondrona Security - PostgreSQL, famaritana, manondro izay VPC tokony hifandraisana amin'ity vondrona ity ary tsindrio ny bokotra Create:
Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Fenoy ny fitsipika Inbound ho an'ny port 5432 ho an'ny vondrona vao noforonina, araka ny aseho amin'ny sary etsy ambany. Tsy azonao atao ny mamaritra ny seranan-tsambo amin'ny tananao, fa mifidiana PostgreSQL avy amin'ny lisitra midina Type.

Raha ny marina, ny sanda :: / 0 dia midika ny fisian'ny fifamoivoizana miditra amin'ny mpizara manerana izao tontolo izao, izay tsy marina tanteraka amin'ny kanônika, fa mba hamakafaka ny ohatra, andao isika hampiasa ity fomba ity:
Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Miverina any amin'ny pejin'ny navigateur izahay, izay misy ny "Configure advanced settings" misokatra ary safidio ao amin'ny fizarana vondrona fiarovana VPC -> Mifidiana vondrona fiarovana VPC efa misy -> PostgreSQL:
Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Manaraka, ao amin'ny safidy Database -> anarana database -> mametraka ny anarana - habrDB.

Afaka mamela ny masontsivana sisa isika, afa-tsy ny fanafoanana ny backup (fe-potoana fitahirizana backup - 0 andro), ny fanaraha-maso ary ny Performance Insights, amin'ny alàlan'ny default. Tsindrio ny bokotra Mamorona angona:
Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Mpitantana kofehy

Ny dingana farany dia ny fampivoarana asa Spark, izay handrindra ny angona vaovao avy amin'ny Kafka isaky ny roa segondra ary hampiditra ny valiny ao anaty angon-drakitra.

Araka ny nomarihina etsy ambony, ny toeram-pisavana dia rafitra fototra ao amin'ny SparkStreaming izay tsy maintsy amboarina mba hiantohana ny fandeferana diso. Hampiasa toeram-pisavana izahay ary, raha tsy mahomby ny fomba fiasa, ny maody Spark Streaming dia mila miverina any amin'ny toby fisavana farany ary manohy ny kajikajy avy aminy mba hamerenana ny angona very.

Ny fisavana dia azo atao amin'ny alalan'ny fametrahana lahatahiry amin'ny rafitra fichier mahazaka fahadisoana sy azo itokisana (toy ny HDFS, S3, sns.) izay hitehirizana ny mombamomba ny toeram-pisavana. Izany dia atao amin'ny fampiasana, ohatra:

streamingContext.checkpoint(checkpointDirectory)

Amin'ny ohatra ataontsika dia hampiasa ity fomba fiasa manaraka ity isika, izany hoe, raha misy ny checkpointDirectory, dia haverina avy amin'ny data checkpoint ny contexte. Raha tsy misy ny lahatahiry (izany hoe nofoanana voalohany), dia antsoina ny functionToCreateContext mba hamorona contexte vaovao sy hanitsy ny DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Mamorona zavatra DirectStream izahay mba hifandraisana amin'ny lohahevitra "transaction" amin'ny fampiasana ny fomba createDirectStream an'ny tranomboky KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Famaritana ny angona miditra amin'ny endrika JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Amin'ny fampiasana Spark SQL, manao vondrona tsotra izahay ary mampiseho ny valiny ao amin'ny console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Mahazo ny lahatsoratry ny fangatahana ary mandeha amin'ny Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Ary avy eo dia tehirizinay ao anaty latabatra ao amin'ny AWS RDS ny angon-drakitra natambatra. Mba hitahiry ny valin'ny fanangonana amin'ny latabatra database dia hampiasa ny fomba fanoratana ny zavatra DataFrame izahay:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Teny vitsivitsy momba ny fametrahana fifandraisana amin'ny AWS RDS. Namorona ny mpampiasa sy ny tenimiafina ho azy io izahay tamin'ny dingana "Mampivelatra ny AWS PostgreSQL". Tokony hampiasainao ny Endpoint ho url server database, izay aseho ao amin'ny fizarana Connectivity & Security:

Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Mba hampifandraisana tsara an'i Spark sy Kafka dia tokony hampandeha ny asa amin'ny alàlan'ny smark-submit ianao amin'ny fampiasana ny artifact. spark-streaming-kafka-0-8_2.11. Ho fanampin'izany, hampiasa artifact koa izahay hifaneraserana amin'ny angon-drakitra PostgreSQL; hamindra azy ireo amin'ny alàlan'ny --packages.

Ho an'ny fahafahan'ny script, dia hampidirinay ho toy ny mari-pamantarana fampidirana ny anaran'ny mpizara hafatra sy ny lohahevitra tiantsika handraisana data.

Noho izany, fotoana izao hanombohana sy hijerena ny fiasan'ny rafitra:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Nilamina ny zava-drehetra! Araka ny hitanao eo amin'ny sary etsy ambany, raha mandeha ny fampiharana, dia mivoaka isaky ny 2 segondra ny valin'ny fanangonana vaovao, satria nametraka ny elanelan'ny batching ho 2 segondra izahay rehefa namorona ny zavatra StreamingContext:

Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

Manaraka, manao fanontaniana tsotra amin'ny angon-drakitra izahay mba hanamarinana ny fisian'ny rakitra ao amin'ny latabatra transaction_flow:

Apache Kafka sy fanodinana angon-drakitra mivantana miaraka amin'ny Spark Streaming

famaranana

Ity lahatsoratra ity dia nijery ohatra iray amin'ny fanodinana fampahalalana amin'ny alàlan'ny Spark Streaming miaraka amin'i Apache Kafka sy PostgreSQL. Miaraka amin'ny fitomboan'ny angon-drakitra avy amin'ny loharano isan-karazany, sarotra ny manombantombana ny lanjan'ny Spark Streaming amin'ny famoronana fampiharana mivantana sy amin'ny fotoana tena izy.

Azonao atao ny mahita ny kaody loharano feno ao amin'ny fitahirizanao ao GitHub.

Faly aho miresaka momba ity lahatsoratra ity, manantena ny fanehoan-kevitrao aho, ary manantena koa ny fanakianana manorina avy amin'ny mpamaky rehetra.

Maniry anao hahita fahombiazana aho!

Sal. Tany am-boalohany dia nokasaina hampiasa angon-drakitra PostgreSQL eo an-toerana, saingy noho ny fitiavako an'i AWS dia nanapa-kevitra ny hamindra ny angona ho any amin'ny rahona aho. Ao amin'ny lahatsoratra manaraka momba ity lohahevitra ity dia hasehoko ny fomba fampiharana ny rafitra manontolo voalaza etsy ambony ao amin'ny AWS amin'ny fampiasana AWS Kinesis sy AWS EMR. Araho ny vaovao!

Source: www.habr.com

Add a comment