Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Hello, Habr! Illum se nibnu sistema li tipproċessa flussi ta' messaġġi Apache Kafka billi tuża Spark Streaming u niktbu r-riżultati tal-ipproċessar fid-database tal-cloud AWS RDS.

Ejja nimmaġinaw li ċerta istituzzjoni ta’ kreditu tagħtina l-kompitu li nipproċessaw transazzjonijiet deħlin "fuq il-ħin" fil-fergħat kollha tagħha. Dan jista’ jsir bil-għan li tiġi kkalkulata fil-pront pożizzjoni ta’ munita miftuħa għat-teżor, limiti jew riżultati finanzjarji għal tranżazzjonijiet, eċċ.

Kif timplimenta dan il-każ mingħajr l-użu ta 'maġija u perjodi ta' maġija - aqra taħt il-qatgħa! Mur!

Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming
(Sors tal-immaġni)

Introduzzjoni

Naturalment, l-ipproċessar ta 'ammont kbir ta' dejta f'ħin reali jipprovdi opportunitajiet biżżejjed għall-użu f'sistemi moderni. Waħda mill-kombinazzjonijiet l-aktar popolari għal dan hija t-tandem ta 'Apache Kafka u Spark Streaming, fejn Kafka joħloq fluss ta' pakketti ta 'messaġġi deħlin, u Spark Streaming jipproċessa dawn il-pakketti f'intervall ta' ħin partikolari.

Biex tiżdied it-tolleranza tal-ħsarat tal-applikazzjoni, se nużaw punti ta 'kontroll. B'dan il-mekkaniżmu, meta l-magna Spark Streaming teħtieġ tirkupra d-dejta mitlufa, jeħtieġ biss li tmur lura għall-aħħar punt ta 'kontroll u terġa' tibda l-kalkoli minn hemm.

Arkitettura tas-sistema żviluppata

Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Komponenti użati:

  • Apache Kafka hija sistema mqassma ta' messaġġi ta' pubblikazzjoni-abbonament. Adattat kemm għall-konsum tal-messaġġi offline kif ukoll onlajn. Biex jiġi evitat it-telf tad-dejta, il-messaġġi Kafka jinħażnu fuq disk u jiġu replikati fi ħdan il-cluster. Is-sistema Kafka hija mibnija fuq is-servizz ta 'sinkronizzazzjoni ZooKeeper;
  • Apache Spark Streaming - Komponent Spark għall-ipproċessar tad-dejta tal-istrimjar. Il-modulu Spark Streaming huwa mibni bl-użu ta 'arkitettura mikro-lott, fejn il-fluss tad-dejta huwa interpretat bħala sekwenza kontinwa ta' pakketti ta 'dejta żgħar. Spark Streaming jieħu data minn sorsi differenti u jgħaqqadha f'pakketti żgħar. Jinħolqu pakketti ġodda f'intervalli regolari. Fil-bidu ta 'kull intervall ta' ħin, jinħoloq pakkett ġdid, u kwalunkwe data riċevuta matul dak l-intervall hija inkluża fil-pakkett. Fl-aħħar tal-intervall, it-tkabbir tal-pakkett jieqaf. Id-daqs tal-intervall huwa determinat minn parametru msejjaħ l-intervall tal-lott;
  • Apache Spark SQL - tgħaqqad l-ipproċessar relazzjonali ma 'programmazzjoni funzjonali Spark. Data strutturata tfisser data li għandha schema, jiġifieri sett wieħed ta' oqsma għar-rekords kollha. Spark SQL jappoġġja input minn varjetà ta 'sorsi ta' dejta strutturati u, grazzi għad-disponibbiltà ta 'informazzjoni ta' skema, jista 'jirkupra b'mod effiċjenti biss l-oqsma meħtieġa ta' rekords, u jipprovdi wkoll DataFrame APIs;
  • AWS RDS hija database relazzjonali relattivament irħisa bbażata fuq il-cloud, servizz tal-web li jissimplifika s-setup, it-tħaddim u l-iskala, u huwa amministrat direttament mill-Amazon.

L-installazzjoni u t-tħaddim tas-server Kafka

Qabel ma tuża Kafka direttament, trid tiżgura ruħek li għandek Java, għax... JVM jintuża għax-xogħol:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Ejja noħolqu utent ġdid biex jaħdem ma' Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Sussegwentement, niżżel id-distribuzzjoni mill-websajt uffiċjali ta' Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Spakkja l-arkivju mniżżel:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Il-pass li jmiss huwa fakultattiv. Il-fatt hu li s-settings default ma jippermettux li tuża bis-sħiħ il-kapaċitajiet kollha ta 'Apache Kafka. Pereżempju, ħassar suġġett, kategorija, grupp li għalihom jistgħu jiġu ppubblikati messaġġi. Biex tibdel dan, ejja neditjaw il-fajl tal-konfigurazzjoni:

vim ~/kafka/config/server.properties

Żid dan li ġej fl-aħħar tal-fajl:

delete.topic.enable = true

Qabel ma tibda s-server Kafka, għandek bżonn tibda s-server ZooKeeper; se nużaw l-iskritt awżiljarju li jiġi mad-distribuzzjoni Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Wara li ZooKeeper beda b'suċċess, iniedi s-server Kafka f'terminal separat:

bin/kafka-server-start.sh config/server.properties

Ejja noħolqu suġġett ġdid imsejjaħ Transazzjoni:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Ejja niżguraw li jkun inħoloq suġġett bin-numru meħtieġ ta' diviżorji u replikazzjoni:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Ejja nitilfu l-mumenti tal-ittestjar tal-produttur u l-konsumatur għas-suġġett maħluq ġdid. Aktar dettalji dwar kif tista' tittestja li tibgħat u tirċievi messaġġi huma miktuba fid-dokumentazzjoni uffiċjali - Ibgħat xi messaġġi. Ukoll, ngħaddu biex niktbu produttur f'Python billi tuża l-API KafkaProducer.

Kitba tal-produttur

Il-produttur se jiġġenera data każwali - 100 messaġġ kull sekonda. B’dejta każwali nifhmu dizzjunarju li jikkonsisti fi tliet oqsma:

  • Fergħa — isem il-punt tal-bejgħ tal-istituzzjoni ta’ kreditu;
  • munita — munita tat-tranżazzjoni;
  • ammont — l-ammont tat-tranżazzjoni. L-ammont ikun numru pożittiv jekk ikun xiri ta’ munita mill-Bank, u numru negattiv jekk ikun bejgħ.

Il-kodiċi għall-produttur jidher bħal dan:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Sussegwentement, bl-użu tal-metodu ibgħat, nibagħtu messaġġ lis-server, għas-suġġett li neħtieġu, fil-format JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Meta tmexxi l-iskrittura, nirċievu l-messaġġi li ġejjin fit-terminal:

Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Dan ifisser li kollox jaħdem kif ridna - il-produttur jiġġenera u jibgħat messaġġi għas-suġġett li għandna bżonn.
Il-pass li jmiss huwa li tinstalla Spark u tipproċessa dan il-fluss tal-messaġġ.

Installazzjoni ta' Apache Spark

Apache Spark hija pjattaforma tal-kompjuters cluster universali u ta 'prestazzjoni għolja.

Spark jaħdem aħjar minn implimentazzjonijiet popolari tal-mudell MapReduce filwaqt li jappoġġja firxa usa 'ta' tipi ta 'komputazzjoni, inklużi mistoqsijiet interattivi u proċessar ta' stream. Il-veloċità għandha rwol importanti meta tipproċessa ammonti kbar ta 'dejta, peress li hija l-veloċità li tippermettilek taħdem b'mod interattiv mingħajr ma tqatta' minuti jew sigħat tistenna. Waħda mill-akbar saħħiet ta 'Spark li tagħmilha daqshekk mgħaġġla hija l-abbiltà tagħha li twettaq kalkoli fil-memorja.

Dan il-qafas huwa miktub fi Scala, għalhekk għandek bżonn tinstallah l-ewwel:

sudo apt-get install scala

Niżżel id-distribuzzjoni Spark mill-websajt uffiċjali:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Spakkja l-arkivju:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Żid il-mogħdija għal Spark mal-fajl bash:

vim ~/.bashrc

Żid il-linji li ġejjin permezz tal-editur:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Mexxi l-kmand hawn taħt wara li tagħmel bidliet fil-bashrc:

source ~/.bashrc

L-iskjerament ta' AWS PostgreSQL

Li jibqa 'huwa li tiskjera d-database li fiha se ntellgħu l-informazzjoni pproċessata mill-flussi. Għal dan se nużaw is-servizz AWS RDS.

Mur fil-console AWS -> AWS RDS -> Databases -> Oħloq database:
Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Agħżel PostgreSQL u kklikkja Li jmiss:
Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Għax Dan l-eżempju huwa għal skopijiet edukattivi biss; se nużaw server b'xejn "mill-inqas" (Livell Ħieles):
Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Sussegwentement, inpoġġu qurdien fil-blokk Free Tier, u wara dan se nkunu awtomatikament offruti eżempju tal-klassi t2.micro - għalkemm dgħajfa, hija b'xejn u pjuttost adattata għall-kompitu tagħna:
Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Wara jiġu affarijiet importanti ħafna: l-isem tal-istanza tad-database, l-isem tal-utent prinċipali u l-password tiegħu. Ejja nsemmu l-istanza: myHabrTest, utent ewlieni: habr, password: habr12345 u kklikkja fuq il-buttuna Li jmiss:
Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Fil-paġna li jmiss hemm parametri responsabbli għall-aċċessibbiltà tas-server tad-database tagħna minn barra (aċċessibbiltà pubblika) u d-disponibbiltà tal-port:

Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Ejja noħolqu setting ġdid għall-grupp tas-sigurtà VPC, li se jippermetti aċċess estern għas-server tad-database tagħna permezz tal-port 5432 (PostgreSQL).
Ejja mmorru għall-console AWS f'tieqa separata tal-brawżer għad-Dashboard tal-VPC -> Gruppi tas-Sigurtà -> Oħloq taqsima tal-grupp tas-sigurtà:
Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Aħna nissettjaw l-isem għall-grupp tas-Sigurtà - PostgreSQL, deskrizzjoni, indika liema VPC għandu jkun assoċjat miegħu dan il-grupp u kklikkja l-buttuna Oħloq:
Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Imla r-regoli Inbound għall-port 5432 għall-grupp maħluq ġdid, kif muri fl-istampa hawn taħt. Ma tistax tispeċifika l-port manwalment, iżda agħżel PostgreSQL mil-lista drop-down Tip.

B'mod strett, il-valur ::/0 ifisser id-disponibbiltà tat-traffiku deħlin lejn is-server minn madwar id-dinja, li kanonikament mhux għal kollox veru, iżda biex tanalizza l-eżempju, ejja nħallu lilna nfusna nużaw dan l-approċċ:
Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Nirritornaw għall-paġna tal-browser, fejn għandna "Ikkonfigura s-settings avvanzati" miftuħa u agħżel fit-taqsima tal-gruppi tas-sigurtà VPC -> Agħżel gruppi ta 'sigurtà VPC eżistenti -> PostgreSQL:
Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Sussegwentement, fl-għażliet tad-Database -> Isem tad-Database -> issettja l-isem - habrDB.

Nistgħu nħallu l-parametri li jifdal, bl-eċċezzjoni tad-diżattivazzjoni tal-backup (perjodu ta 'żamma ta' backup - jiem 0), monitoraġġ u Performance Insights, b'mod awtomatiku. Ikklikkja fuq il-buttuna Oħloq database:
Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Maniġer tal-ħajt

L-istadju finali se jkun l-iżvilupp ta 'xogħol Spark, li se jipproċessa dejta ġdida li tiġi minn Kafka kull żewġ sekondi u jdaħħal ir-riżultat fid-database.

Kif innutat hawn fuq, il-punti ta 'kontroll huma mekkaniżmu ewlieni fi SparkStreaming li għandu jiġi kkonfigurat biex jiżgura t-tolleranza tal-ħsarat. Se nużaw punti ta 'kontroll u, jekk il-proċedura tfalli, il-modulu Spark Streaming ikollu bżonn biss li jerġa' lura għall-aħħar punt ta 'kontroll u jerġa' jibda l-kalkoli minnu biex jirkupra d-dejta mitlufa.

Checkpointing jista 'jiġi attivat billi jiġi stabbilit direttorju fuq sistema ta' fajls affidabbli u tolleranti għall-ħsarat (bħal HDFS, S3, eċċ.) li fiha se tinħażen l-informazzjoni dwar il-punt ta 'kontroll. Dan isir bl-użu, pereżempju:

streamingContext.checkpoint(checkpointDirectory)

Fl-eżempju tagħna, se nużaw l-approċċ li ġej, jiġifieri, jekk jeżisti checkpointDirectory, allura l-kuntest jiġi rikrejat mid-data tal-punt ta 'kontroll. Jekk id-direttorju ma jeżistix (jiġifieri esegwit għall-ewwel darba), allura functionToCreateContext tissejjaħ biex toħloq kuntest ġdid u tikkonfigura DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Aħna noħolqu oġġett DirectStream biex nikkonnettjaw mas-suġġett "transazzjoni" billi tuża l-metodu createDirectStream tal-librerija KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Parsing tad-dejta li tidħol fil-format JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Billi tuża Spark SQL, nagħmlu grupp sempliċi u nuru r-riżultat fil-console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Ikseb it-test tal-mistoqsija u tħaddem permezz ta' Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

U mbagħad nissejvjaw id-dejta aggregata li tirriżulta f'tabella f'AWS RDS. Biex issalva r-riżultati tal-aggregazzjoni f'tabella tad-database, se nużaw il-metodu tal-kitba tal-oġġett DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Ftit kelmiet dwar it-twaqqif ta' konnessjoni ma' AWS RDS. Ħloqna l-utent u l-password għaliha fil-pass "Deploying AWS PostgreSQL". Għandek tuża Endpoint bħala l-url tas-server tad-database, li tidher fit-taqsima Konnettività u sigurtà:

Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Sabiex tikkonnettja b'mod korrett Spark u Kafka, għandek tħaddem ix-xogħol permezz ta' smark-submit billi tuża l-artifact spark-streaming-kafka-0-8_2.11. Barra minn hekk, se nużaw ukoll artifact għall-interazzjoni mad-database PostgreSQL; se nittrasferixxuhom permezz ta '--packages.

Għall-flessibbiltà tal-iskrittura, aħna se ninkludu wkoll bħala parametri ta’ input l-isem tas-server tal-messaġġi u s-suġġett li minnu rridu nirċievu d-dejta.

Allura, wasal iż-żmien li tniedi u tiċċekkja l-funzjonalità tas-sistema:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Kollox ħadem! Kif tistgħu taraw fl-istampa hawn taħt, waqt li l-applikazzjoni tkun qed taħdem, joħorġu riżultati ġodda ta 'aggregazzjoni kull 2 ​​sekondi, għaliex aħna waqqafna l-intervall ta' batching għal 2 sekondi meta ħloqna l-oġġett StreamingContext:

Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Sussegwentement, nagħmlu mistoqsija sempliċi għad-database biex tivverifika l-preżenza tar-rekords fit-tabella fluss_transazzjoni:

Apache Kafka u Ipproċessar tad-Data Streaming bi Spark Streaming

Konklużjoni

Dan l-artikolu ħares lejn eżempju ta 'proċessar ta' fluss ta 'informazzjoni bl-użu ta' Spark Streaming flimkien ma 'Apache Kafka u PostgreSQL. Bit-tkabbir tad-dejta minn sorsi varji, huwa diffiċli li jiġi stmat iżżejjed il-valur prattiku ta 'Spark Streaming għall-ħolqien ta' streaming u applikazzjonijiet f'ħin reali.

Tista' ssib il-kodiċi tas-sors sħiħ fir-repożitorju tiegħi fuq GitHub.

Ninsab kuntent li niddiskuti dan l-artikolu, nistenna bil-ħerqa l-kummenti tiegħek, u nittama wkoll għal kritika kostruttiva mill-qarrejja kollha li jieħdu ħsieb.

Nixtieqek suċċess!

Ps. Inizjalment kien ippjanat li nuża database PostgreSQL lokali, iżda minħabba l-imħabba tiegħi għall-AWS, iddeċidejt li nmexxi d-database għall-cloud. Fl-artiklu li jmiss dwar dan is-suġġett, ser nuri kif nimplimenta s-sistema kollha deskritta hawn fuq f'AWS billi tuża AWS Kinesis u AWS EMR. Segwi l-aħbarijiet!

Sors: www.habr.com

Żid kumment