Apache Kafka ug Data Streaming nga adunay Spark Streaming

Hoy Habr! Karon maghimo kami usa ka sistema nga magproseso sa mga sapa sa mensahe sa Apache Kafka gamit ang Spark Streaming ug isulat ang resulta sa pagproseso sa database sa panganod sa AWS RDS.

Hunahunaa nga ang usa ka institusyon sa kredito nagbutang sa atong atubangan sa tahas sa pagproseso sa umaabot nga mga transaksyon "sa langaw" alang sa tanan nga mga sanga niini. Mahimo kini alang sa katuyoan nga dali nga makalkula ang bukas nga posisyon sa salapi alang sa tipiganan, mga limitasyon o resulta sa pinansyal sa mga transaksyon, ug uban pa.

Sa unsa nga paagi sa pagpatuman niini nga kaso nga walay paggamit sa salamangka ug salamangka spelling - basaha ubos sa cut! Lakaw!

Apache Kafka ug Data Streaming nga adunay Spark Streaming
(Gigikanan sa hulagway)

Pasiuna

Siyempre, ang real-time nga pagproseso sa usa ka dako nga kantidad sa datos naghatag og daghang mga oportunidad alang sa paggamit sa modernong mga sistema. Usa sa labing popular nga mga kombinasyon alang niini mao ang tandem sa Apache Kafka ug Spark Streaming, diin ang Kafka nagmugna og usa ka sapa sa umaabot nga mga pakete sa mensahe, ug ang Spark Streaming nagproseso niini nga mga pakete sa usa ka piho nga agwat sa panahon.

Aron mapauswag ang pagtugot sa sayup sa aplikasyon, mogamit kami mga checkpoints - checkpoints. Uban niini nga mekanismo, kung ang Spark Streaming module kinahanglan nga mabawi ang nawala nga datos, kinahanglan ra nga mobalik sa katapusan nga checkpoint ug ipadayon ang mga kalkulasyon gikan didto.

Ang arkitektura sa naugmad nga sistema

Apache Kafka ug Data Streaming nga adunay Spark Streaming

Gigamit nga mga sangkap:

  • Apache Kafka kay usa ka distributed publish-and-subscribe messaging system. Angayan alang sa offline ug online nga pagkonsumo sa mensahe. Aron mapugngan ang pagkawala sa datos, ang mga mensahe sa Kafka gitipigan sa disk ug gisundog sulod sa cluster. Ang sistema sa Kafka gitukod sa ibabaw sa serbisyo sa pag-synchronize sa ZooKeeper;
  • Apache Spark Streaming - usa ka sangkap sa Spark alang sa pagproseso sa streaming data. Ang Spark Streaming module gitukod gamit ang usa ka micro-batch nga arkitektura, kung ang usa ka data stream gihubad ingon usa ka padayon nga pagkasunod-sunod sa gagmay nga mga pakete sa datos. Ang Spark Streaming nagkuha ug datos gikan sa lain-laing mga tinubdan ug gihiusa kini ngadto sa gagmay nga mga batch. Ang mga bag-ong pakete gihimo sa regular nga mga agwat. Sa pagsugod sa matag agwat sa oras, usa ka bag-ong pakete ang gihimo, ug bisan unsang datos nga nadawat sa kana nga agwat gilakip sa pakete. Sa katapusan sa agwat, ang pagtubo sa pakete mohunong. Ang gidak-on sa agwat gitino pinaagi sa usa ka parametro nga gitawag ug batch interval;
  • Apache Spark SQL - Gihiusa ang relational nga pagproseso sa Spark functional programming. Ang structured data nagtumong sa datos nga adunay schema, nga mao, usa ka set sa field para sa tanang record. Ang Spark SQL nagsuporta sa input gikan sa lain-laing mga structured nga mga tinubdan sa datos ug, tungod sa presensya sa impormasyon sa schema, kini epektibo nga makakuha lamang sa gikinahanglan nga mga natad sa mga rekord, ug naghatag usab sa DataFrame API;
  • AWS RDS kay medyo barato nga cloud-based relational database, usa ka web service nga nagpasimple sa setup, operation, ug scaling, nga direktang gidumala sa Amazon.

Pag-instalar ug pagpadagan sa Kafka server

Sa dili pa gamiton ang Kafka direkta, kinahanglan nimo nga sigurohon nga ikaw adunay Java, tungod kay Ang JVM gigamit alang sa trabaho:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Magbuhat ta og bag-ong user nga motrabaho sa Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Sunod, i-download ang distribution kit gikan sa opisyal nga website sa Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Unpack ang na-download nga archive:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Ang sunod nga lakang kay opsyonal. Ang tinuod mao nga ang mga default setting wala magtugot kanimo sa hingpit nga paggamit sa tanan nga mga bahin sa Apache Kafka. Pananglitan, kuhaa ang usa ka hilisgutan, kategorya, grupo, diin ang mga mensahe mahimong ma-publish. Aron usbon kini, atong usbon ang configuration file:

vim ~/kafka/config/server.properties

Idugang ang mosunod sa katapusan sa file:

delete.topic.enable = true

Sa wala pa magsugod ang Kafka server, kinahanglan nimo nga sugdan ang ZooKeeper server, among gamiton ang helper script nga kauban sa Kafka distribution:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Human nga malampuson nga nagsugod ang ZooKeeper, atong sugdan ang Kafka server sa bulag nga terminal:

bin/kafka-server-start.sh config/server.properties

Maghimo kita og bag-ong hilisgutan nga gitawag og Transaksyon:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Atong siguroon nga ang usa ka hilisgutan nga adunay gikinahanglan nga gidaghanon sa mga partisyon ug pagkopya nahimo na:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka ug Data Streaming nga adunay Spark Streaming

Atong kalimtan ang mga higayon sa pagsulay sa prodyuser ug konsumidor alang sa bag-ong nahimo nga hilisgutan. Ang dugang nga mga detalye kung giunsa nimo pagsulay ang pagpadala ug pagdawat sa mga mensahe gisulat sa opisyal nga dokumentasyon - Pagpadala pipila ka mga mensahe. Aw, nagpadayon kami sa pagsulat sa usa ka prodyuser sa Python gamit ang KafkaProducer API.

Pagsulat sa producer

Ang prodyuser maghimo ug random nga datos - 100 ka mensahe matag segundo. Pinaagi sa random nga datos gipasabot namo ang usa ka diksyonaryo nga naglangkob sa tulo ka mga natad:

  • Sanga - ngalan sa punto sa pagbaligya sa institusyon sa kredito;
  • currency - currency sa transaksyon;
  • Kadaghanon - ang kantidad sa transaksyon. Ang kantidad mahimong positibo kung kini usa ka pagpalit sa salapi sa Bangko, ug negatibo kung kini usa ka pagbaligya.

Ang code alang sa prodyuser ingon niini:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Sunod, gamit ang paagi sa pagpadala, nagpadala kami usa ka mensahe sa server, sa hilisgutan nga kinahanglan namon, sa format nga JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Kung gipadagan ang script, makuha namon ang mga mosunud nga mensahe sa terminal:

Apache Kafka ug Data Streaming nga adunay Spark Streaming

Kini nagpasabut nga ang tanan molihok sumala sa among gusto - ang prodyuser naghimo ug nagpadala mga mensahe sa hilisgutan nga among gikinahanglan.
Ang sunod nga lakang mao ang pag-install sa Spark ug iproseso kini nga dagan sa mensahe.

Pag-instalar sa Apache Spark

Apache Spark usa ka versatile ug high performance cluster computing platform.

Ang Spark milabaw sa popular nga mga pagpatuman sa MapReduce nga modelo sa mga termino sa performance, samtang naghatag og suporta alang sa mas lapad nga mga matang sa computation, lakip ang interactive nga mga pangutana ug streaming. Ang katulin adunay hinungdanon nga papel sa pagproseso sa daghang mga datos, tungod kay kini ang katulin nga nagtugot kanimo sa pagtrabaho nga interactive nga wala’y paggasto mga minuto o oras sa paghulat. Usa sa pinakadako nga kalig-on sa Spark alang sa paghatud niini nga katulin mao ang abilidad niini sa paghimo sa mga kalkulasyon sa memorya.

Kini nga balangkas gisulat sa Scala, mao nga kinahanglan nimo kini i-install una:

sudo apt-get install scala

I-download ang Spark distribution gikan sa opisyal nga website:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Unpack ang archive:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Idugang ang dalan sa Spark sa bash file:

vim ~/.bashrc

Idugang ang mosunod nga mga linya pinaagi sa editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Pagdalagan ang sugo sa ubos human sa paghimo sa mga kausaban sa bashrc:

source ~/.bashrc

Pag-deploy sa AWS PostgreSQL

Nagpabilin kini sa pagpalapad sa database, diin atong pun-on ang giproseso nga impormasyon gikan sa mga sapa. Aron mahimo kini, among gamiton ang serbisyo sa AWS RDS.

Adto sa AWS console -> AWS RDS -> Mga Database -> Paghimo database:
Apache Kafka ug Data Streaming nga adunay Spark Streaming

Pilia ang PostgreSQL ug i-klik ang Next button:
Apache Kafka ug Data Streaming nga adunay Spark Streaming

Kay kini nga pananglitan gi-analisa alang lamang sa mga katuyoan sa edukasyon, mogamit kami usa ka libre nga server "sa labing gamay" (Libre nga Tier):
Apache Kafka ug Data Streaming nga adunay Spark Streaming

Sunod, susiha ang kahon nga Libre nga Tier, ug pagkahuman awtomatiko nga itanyag ang usa ka pananglitan sa t2.micro nga klase - bisan kung huyang, apan libre ug angay alang sa among buluhaton:
Apache Kafka ug Data Streaming nga adunay Spark Streaming

Importante kaayo nga mga butang ang mosunod: ang ngalan sa DB nga pananglitan, ang ngalan sa master user ug ang iyang password. Tawgon nato ang pananglitan: myHabrTest, master user: habr, password: habr12345 ug i-klik ang Next button:
Apache Kafka ug Data Streaming nga adunay Spark Streaming

Ang sunod nga panid naglangkob sa mga parameter nga responsable sa pag-access sa among database server gikan sa gawas (Pag-access sa publiko) ug ang pagkaanaa sa mga pantalan:

Apache Kafka ug Data Streaming nga adunay Spark Streaming

Magbuhat ta ug bag-ong setting para sa grupo sa seguridad sa VPC, nga magtugot sa gawas nga pag-access sa among database server pinaagi sa port 5432 (PostgreSQL).
Adto ta sa bulag nga browser window sa AWS console sa VPC Dashboard -> Security Groups -> Create security group section:
Apache Kafka ug Data Streaming nga adunay Spark Streaming

Gibutang namon ang ngalan alang sa grupo sa Seguridad - PostgreSQL, usa ka paghulagway, ipiho kung unsang VPC kini nga grupo kinahanglan nga kauban ug i-klik ang Paghimo buton:
Apache Kafka ug Data Streaming nga adunay Spark Streaming

Gipuno namo ang bag-ong gibuhat nga grupo Inbound nga mga lagda alang sa port 5432, ingon sa gipakita sa hulagway sa ubos. Dili nimo matino ang port nga mano-mano, apan pilia ang PostgreSQL gikan sa Type nga drop-down list.

Sa estrikto nga pagkasulti, ang bili ::/0 nagpasabot sa pagkaanaa sa umaabot nga trapiko alang sa server gikan sa tibuok kalibutan, nga dili tinuod nga kanonically, apan aron pag-analisar sa pananglitan, atong gamiton kini nga paagi:
Apache Kafka ug Data Streaming nga adunay Spark Streaming

Mibalik kami sa panid sa browser, diin kami adunay bukas nga "I-configure ang mga advanced setting" ug pilia ang mga grupo sa seguridad sa VPC -> Pilia ang naa na nga mga grupo sa seguridad sa VPC -> PostgreSQL sa seksyon:
Apache Kafka ug Data Streaming nga adunay Spark Streaming

Sunod, sa seksyon Mga kapilian sa database -> Ngalan sa database -> itakda ang ngalan - habrDB.

Ang nahabilin nga mga parametro, gawas sa pag-disable sa backup (panahon sa pagpadayon sa backup - 0 ka adlaw), pag-monitor ug Performance Insights, mahimong ibilin pinaagi sa default. I-klik ang buton Paghimo database:
Apache Kafka ug Data Streaming nga adunay Spark Streaming

Stream Handler

Ang katapusan nga yugto mao ang pag-uswag sa usa ka trabaho sa Spark, nga magproseso sa bag-ong datos gikan sa Kafka matag duha ka segundo ug ipasulod ang resulta sa database.

Sama sa nahisgotan na sa ibabaw, ang mga checkpoints mao ang nag-unang mekanismo sa SparkStreaming nga kinahanglang i-configure aron makahatag og fault tolerance. Maggamit kami og mga checkpoint ug, kung adunay kapakyasan sa pamaagi, ang Spark Streaming module kinahanglan ra nga mobalik sa katapusan nga checkpoint ug ipadayon ang mga kalkulasyon gikan niini aron mabawi ang nawala nga datos.

Ang checkpoint mahimong ma-enable pinaagi sa pagbutang og direktoryo sa fault-tolerant, kasaligang file system (e.g. HDFS, S3, ug uban pa) diin ang impormasyon sa checkpoint itago. Gihimo kini, pananglitan:

streamingContext.checkpoint(checkpointDirectory)

Sa among pananglitan, among gamiton ang mosunod nga pamaagi, nga mao, kung ang checkpointDirectory anaa, nan ang konteksto pagabuhaton pag-usab gikan sa checkpoint data. Kung ang direktoryo wala maglungtad (pananglitan, kini gipatuman sa unang higayon), unya ang functionToCreateContext function gitawag aron sa paghimo og bag-ong konteksto ug pag-set up sa DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Naghimo kami og DirectStream nga butang aron makonektar sa "transaksyon" nga hilisgutan gamit ang createDirectStream nga pamaagi sa KafkaUtils library:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Pag-parse sa umaabot nga data sa JSON format:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Gamit ang Spark SQL, naghimo kami usa ka yano nga pag-grupo ug gi-output ang resulta sa console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Pagkuha sa lawas sa pangutana ug pagpadagan niini pinaagi sa Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Ug dayon among gitipigan ang nadawat nga aggregated data sa usa ka lamesa sa AWS RDS. Aron matipigan ang mga resulta sa aggregation sa usa ka database table, atong gamiton ang write method sa DataFrame object:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Pipila ka mga pulong bahin sa pag-set up og koneksyon sa AWS RDS. Gihimo namo ang user ug password alang niini sa lakang nga "Pag-deploy sa AWS PostgreSQL". Isip url sa database server, kinahanglan nimong gamiton ang Endpoint, nga gipakita sa Connectivity & security section:

Apache Kafka ug Data Streaming nga adunay Spark Streaming

Aron sa husto nga pagkonektar sa Spark ug Kafka, kinahanglan nimong ipadagan ang trabaho pinaagi sa smark-submit gamit ang artifact spark-streaming-kafka-0-8_2.11. Dugang pa, mogamit usab kami usa ka artifact alang sa pakig-uban sa database sa PostgreSQL, among ipasa kini pinaagi sa --packages.

Alang sa pagka-flexible sa script, kuhaon usab namon ang ngalan sa server sa mensahe ug ang hilisgutan diin gusto namon makadawat mga datos ingon mga parameter sa pag-input.

Busa, panahon na sa pagdagan ug pagsulay sa sistema:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Nahuman ang tanan! Sama sa imong makita sa hulagway sa ubos, samtang ang aplikasyon nagdagan, ang mga bag-ong resulta sa aggregation gipakita matag 2 segundos, tungod kay among gibutang ang bundling interval ngadto sa 2 segundos sa dihang among gibuhat ang StreamingContext object:

Apache Kafka ug Data Streaming nga adunay Spark Streaming

Sunod, naghimo kami usa ka yano nga pangutana sa database aron susihon ang mga rekord sa lamesa transaction_flow:

Apache Kafka ug Data Streaming nga adunay Spark Streaming

konklusyon

Niini nga artikulo, usa ka pananglitan sa streaming nga pagproseso sa impormasyon gamit ang Spark Streaming inubanan sa Apache Kafka ug PostgreSQL gikonsiderar. Uban sa pag-uswag sa gidaghanon sa mga datos gikan sa nagkalain-laing mga tinubdan, lisud ang pag-overestimate sa praktikal nga bili sa Spark Streaming alang sa paghimo sa real-time ug streaming nga mga aplikasyon.

Makita nimo ang tibuok source code sa akong repository sa GitHub.

Nalipay ako nga hisgutan kini nga artikulo, nagpaabut ako sa imong mga komento, ug usab, nanghinaut ako nga adunay mapuslanon nga pagsaway gikan sa tanan nga nagpakabana nga mga magbabasa.

Manghinaut ko nga molampos ka!

Sal. Kini orihinal nga giplano nga mogamit sa usa ka lokal nga database sa PostgreSQL, apan tungod sa akong gugma sa AWS, nakahukom ko nga ibalhin ang database ngadto sa panganod. Sa sunod nga artikulo bahin niini nga hilisgutan, ipakita ko kanimo kung giunsa ipatuman ang tibuuk nga sistema nga gihulagway sa taas sa AWS gamit ang AWS Kinesis ug AWS EMR. Sunda ang balita!

Source: www.habr.com

Idugang sa usa ka comment