Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Kaabo, Habr! Loni a yoo kọ eto kan ti yoo ṣe ilana awọn ṣiṣan ifiranṣẹ Apache Kafka nipa lilo ṣiṣan ṣiṣan Spark ati kọ awọn abajade sisẹ si data awọsanma AWS RDS.

Jẹ ki a fojuinu pe ile-iṣẹ kirẹditi kan ṣeto wa ni iṣẹ ṣiṣe ti sisẹ awọn iṣowo ti nwọle “lori fifo” kọja gbogbo awọn ẹka rẹ. Eyi le ṣee ṣe fun idi ti ṣiṣe iṣiro kiakia ipo owo ṣiṣi silẹ fun iṣura, awọn opin tabi awọn abajade owo fun awọn iṣowo, ati bẹbẹ lọ.

Bii o ṣe le ṣe imuse ọran yii laisi lilo idan ati awọn ìráníyè idan - ka labẹ gige! Lọ!

Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark
(orisun aworan)

Ifihan

Nitoribẹẹ, ṣiṣe iwọn data nla ni akoko gidi n pese awọn aye lọpọlọpọ fun lilo ninu awọn eto ode oni. Ọkan ninu awọn akojọpọ olokiki julọ fun eyi ni tandem ti Apache Kafka ati Spark Streaming, nibiti Kafka ṣẹda ṣiṣan ti awọn apo-iwe ifiranṣẹ ti nwọle, ati Spark Streaming awọn apo-iwe wọnyi ni aarin akoko ti a fun.

Lati mu ifarada aṣiṣe ti ohun elo naa pọ si, a yoo lo awọn aaye ayẹwo. Pẹlu ẹrọ yii, nigbati ẹrọ ṣiṣan ṣiṣan Spark nilo lati gba data ti o sọnu pada, o nilo lati pada si aaye ayẹwo ti o kẹhin ki o bẹrẹ awọn iṣiro lati ibẹ.

Faaji ti eto idagbasoke

Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Awọn eroja ti a lo:

  • Afun Kafka jẹ eto fifiranṣẹ ṣiṣe alabapin ti o pin kaakiri. Dara fun mejeeji offline ati lilo ifiranṣẹ ori ayelujara. Lati ṣe idiwọ pipadanu data, awọn ifiranṣẹ Kafka ti wa ni ipamọ sori disiki ati tun ṣe laarin iṣupọ naa. Eto Kafka ti wa ni itumọ lori oke ti iṣẹ amuṣiṣẹpọ ZooKeeper;
  • Apache Sipaki śiśanwọle - Sipaki paati fun sisẹ data sisanwọle. Module śiśanwọle Spark jẹ itumọ nipa lilo faaji kekere-ipele kan, nibiti a ti tumọ ṣiṣan data bi ọna ti nlọsiwaju ti awọn apo-iwe data kekere. Sipaki ṣiṣan gba data lati awọn orisun oriṣiriṣi ati daapọ sinu awọn idii kekere. Awọn idii tuntun ni a ṣẹda ni awọn aaye arin deede. Ni ibẹrẹ ti aarin igba kọọkan, apo tuntun ti ṣẹda, ati eyikeyi data ti o gba lakoko aarin yẹn wa ninu apo. Ni ipari aarin, idagba apo duro. Iwọn ti aarin jẹ ipinnu nipasẹ paramita kan ti a pe ni aarin igba;
  • Apache Spark SQL - daapọ ti ibatan processing pẹlu Spark siseto iṣẹ. Awọn data ti a ṣeto tumọ si data ti o ni eto, eyini ni, awọn aaye kan ṣoṣo fun gbogbo awọn igbasilẹ. Spark SQL ṣe atilẹyin igbewọle lati oriṣiriṣi awọn orisun data eleto ati, o ṣeun si wiwa alaye ero, o le gba daradara nikan awọn aaye ti a beere fun awọn igbasilẹ, ati tun pese DataFrame APIs;
  • Aws RDS jẹ aaye data ibatan ti o da lori awọsanma ti ko gbowolori, iṣẹ wẹẹbu ti o rọrun iṣeto, iṣẹ ṣiṣe ati iwọn, ati pe o nṣakoso taara nipasẹ Amazon.

Fifi ati nṣiṣẹ olupin Kafka

Ṣaaju lilo Kafka taara, o nilo lati rii daju pe o ni Java, nitori… JVM lo fun iṣẹ:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Jẹ ki a ṣẹda olumulo tuntun lati ṣiṣẹ pẹlu Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Nigbamii, ṣe igbasilẹ pinpin lati oju opo wẹẹbu Apache Kafka osise:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Ṣii iwe ipamọ ti a gbasile:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Igbesẹ t’okan jẹ iyan. Otitọ ni pe awọn eto aiyipada ko gba ọ laaye lati lo gbogbo awọn ẹya ti Apache Kafka. Fun apẹẹrẹ, paarẹ koko-ọrọ kan, ẹka, ẹgbẹ eyiti o le ṣe atẹjade awọn ifiranṣẹ si. Lati yi eyi pada, jẹ ki a ṣatunkọ faili iṣeto:

vim ~/kafka/config/server.properties

Fi atẹle naa kun si opin faili naa:

delete.topic.enable = true

Ṣaaju ki o to bẹrẹ olupin Kafka, o nilo lati bẹrẹ olupin ZooKeeper; a yoo lo iwe afọwọkọ ti o wa pẹlu pinpin Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Lẹhin ti ZooKeeper ti bẹrẹ ni aṣeyọri, ṣe ifilọlẹ olupin Kafka ni ebute lọtọ:

bin/kafka-server-start.sh config/server.properties

Jẹ ki a ṣẹda koko tuntun kan ti a pe ni Iṣowo:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Jẹ ki a rii daju pe koko kan pẹlu nọmba ti a beere fun awọn ipin ati ẹda ti a ti ṣẹda:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Jẹ ki a padanu awọn akoko ti idanwo olupilẹṣẹ ati alabara fun koko tuntun ti a ṣẹda. Awọn alaye diẹ sii nipa bii o ṣe le ṣe idanwo fifiranṣẹ ati gbigba awọn ifiranṣẹ ni a kọ sinu iwe aṣẹ osise - Firanṣẹ diẹ ninu awọn ifiranṣẹ. O dara, a tẹsiwaju si kikọ olupilẹṣẹ kan ni Python nipa lilo API KafkaProducer.

Olupilẹṣẹ kikọ

Olupilẹṣẹ yoo ṣe ipilẹṣẹ data laileto - awọn ifiranṣẹ 100 ni gbogbo iṣẹju-aaya. Nipa data laileto a tumọ si iwe-itumọ ti o ni awọn aaye mẹta:

  • Branch - orukọ aaye tita ile-iṣẹ kirẹditi;
  • owo - owo idunadura;
  • iye - idunadura iye. Iye naa yoo jẹ nọmba rere ti o ba jẹ rira owo nipasẹ Banki, ati nọmba odi ti o ba jẹ tita.

Awọn koodu fun olupilẹṣẹ dabi eyi:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Nigbamii, ni lilo ọna fifiranṣẹ, a fi ifiranṣẹ ranṣẹ si olupin naa, si koko ti a nilo, ni ọna kika JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Nigbati o ba nṣiṣẹ iwe afọwọkọ, a gba awọn ifiranṣẹ wọnyi ni ebute naa:

Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Eyi tumọ si pe ohun gbogbo n ṣiṣẹ bi a ṣe fẹ - olupilẹṣẹ ṣe ipilẹṣẹ ati firanṣẹ awọn ifiranṣẹ si koko ti a nilo.
Igbesẹ t’okan ni lati fi sori ẹrọ Spark ati ilana ṣiṣan ifiranṣẹ yii.

Fifi Apache Spark sori ẹrọ

Agbejade Afun jẹ ipilẹ ẹrọ iširo iṣupọ gbogbo agbaye ati iṣẹ giga.

Spark ṣe dara julọ ju awọn imuse olokiki ti awoṣe MapReduce lakoko ti o n ṣe atilẹyin ọpọlọpọ awọn oriṣi iṣiro, pẹlu awọn ibeere ibaraenisepo ati ṣiṣiṣẹ ṣiṣan. Iyara ṣe ipa pataki nigbati o ba n ṣiṣẹ data nla, nitori iyara ti o fun ọ laaye lati ṣiṣẹ ni ibaraenisọrọ laisi lilo awọn iṣẹju tabi awọn wakati duro. Ọkan ninu awọn agbara nla ti Spark ti o jẹ ki o yara ni agbara rẹ lati ṣe awọn iṣiro-iranti.

Ilana yii jẹ kikọ ni Scala, nitorinaa o nilo lati fi sii ni akọkọ:

sudo apt-get install scala

Ṣe igbasilẹ pinpin Spark lati oju opo wẹẹbu osise:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Tu ile-ipamọ silẹ:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Ṣafikun ọna si Spark si faili bash:

vim ~/.bashrc

Ṣafikun awọn ila wọnyi nipasẹ olootu:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Ṣiṣe aṣẹ ni isalẹ lẹhin ṣiṣe awọn ayipada si bashrc:

source ~/.bashrc

Gbigbe AWS PostgreSQL

Gbogbo ohun ti o ku ni lati mu ibi ipamọ data lọ si eyiti a yoo gbejade alaye ti a ṣe ilana lati awọn ṣiṣan. Fun eyi a yoo lo iṣẹ AWS RDS.

Lọ si console AWS -> AWS RDS -> Awọn aaye data -> Ṣẹda data data:
Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Yan PostgreSQL ki o tẹ Itele:
Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Nitori Apeere yii wa fun awọn idi eto-ẹkọ nikan; a yoo lo olupin ọfẹ “ni o kere ju” (Ipele Ọfẹ):
Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Nigbamii ti, a fi ami kan sinu Àkọsílẹ Tier Free, ati lẹhin eyi a yoo funni ni apẹẹrẹ ti kilasi t2.micro laifọwọyi - biotilejepe o lagbara, o jẹ ọfẹ ati pe o dara fun iṣẹ-ṣiṣe wa:
Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Nigbamii ti o wa awọn nkan pataki pupọ: orukọ apẹẹrẹ ibi ipamọ data, orukọ olumulo oluwa ati ọrọ igbaniwọle rẹ. Jẹ ki a lorukọ apẹẹrẹ: myHabrTest, olumulo titun: habr, ọrọigbaniwọle: habr12345 ki o si tẹ bọtini atẹle:
Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Ni oju-iwe ti o tẹle awọn paramita wa ti o ni iduro fun iraye si olupin data data wa lati ita (Wiwọle ti gbogbo eniyan) ati wiwa ibudo:

Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Jẹ ki a ṣẹda eto tuntun fun ẹgbẹ aabo VPC, eyiti yoo gba iraye si ita si olupin data wa nipasẹ ibudo 5432 (PostgreSQL).
Jẹ ki a lọ si console AWS ni window aṣawakiri lọtọ si Dasibodu VPC -> Awọn ẹgbẹ Aabo -> Ṣẹda apakan ẹgbẹ aabo:
Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

A ṣeto orukọ fun ẹgbẹ Aabo - PostgreSQL, apejuwe kan, tọkasi iru VPC ẹgbẹ yii yẹ ki o ni nkan ṣe pẹlu tẹ bọtini Ṣẹda:
Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Fọwọsi awọn ofin Inbound fun ibudo 5432 fun ẹgbẹ tuntun ti a ṣẹda, bi o ṣe han ninu aworan ni isalẹ. O ko le pato ibudo pẹlu ọwọ, ṣugbọn yan PostgreSQL lati inu akojọ-isalẹ Iru.

Ni pipe, iye :: / 0 tumọ si wiwa ti ijabọ ti nwọle si olupin lati gbogbo agbala aye, eyiti ko jẹ otitọ patapata, ṣugbọn lati ṣe itupalẹ apẹẹrẹ, jẹ ki a gba ara wa laaye lati lo ọna yii:
Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

A pada si oju-iwe aṣawakiri, nibiti a ti ni “Ṣatunkọ awọn eto ilọsiwaju” ṣii ati yan ni apakan awọn ẹgbẹ aabo VPC -> Yan awọn ẹgbẹ aabo VPC to wa tẹlẹ -> PostgreSQL:
Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Nigbamii, ninu awọn aṣayan aaye data -> Orukọ aaye data -> ṣeto orukọ naa - habrDB.

A le fi awọn paramita ti o ku silẹ, laisi piparẹ afẹyinti (akoko idaduro afẹyinti - awọn ọjọ 0), ibojuwo ati Awọn oye Iṣe, nipasẹ aiyipada. Tẹ lori bọtini Ṣẹda ibi ipamọ data:
Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Olutọju okun

Ipele ikẹhin yoo jẹ idagbasoke iṣẹ Spark kan, eyiti yoo ṣe ilana data tuntun ti o nbọ lati Kafka ni gbogbo iṣẹju-aaya meji ati tẹ abajade sinu ibi ipamọ data.

Gẹgẹbi a ti ṣe akiyesi loke, awọn aaye ayẹwo jẹ ẹrọ mojuto ni SparkStreaming ti o gbọdọ tunto lati rii daju ifarada aṣiṣe. A yoo lo awọn aaye ayẹwo ati, ti ilana naa ba kuna, module Spark Streaming yoo nilo lati pada si aaye ayẹwo ti o kẹhin ki o tun bẹrẹ awọn iṣiro lati ọdọ rẹ lati gba data ti o sọnu pada.

Ṣiṣayẹwo le ṣee mu ṣiṣẹ nipa tito ilana kan sori ẹrọ ifarada-aṣiṣe, eto faili ti o gbẹkẹle (bii HDFS, S3, ati bẹbẹ lọ) ninu eyiti alaye ibi ayẹwo yoo wa ni ipamọ. Eyi ni a ṣe nipa lilo, fun apẹẹrẹ:

streamingContext.checkpoint(checkpointDirectory)

Ninu apẹẹrẹ wa, a yoo lo ọna ti o tẹle, eyun, ti checkpointDirectory ba wa, lẹhinna ọrọ-ọrọ naa yoo tun ṣe lati data ibi ayẹwo. Ti itọsọna naa ko ba si (ie ti a ṣe fun igba akọkọ), lẹhinna iṣẹToCreateContext ni a pe lati ṣẹda ipo tuntun ati tunto DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

A ṣẹda ohun DirectStream kan lati sopọ si koko-ọrọ “idunadura” ni lilo ọna ṣẹdaDirectStream ti ile-ikawe KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Ṣiṣayẹwo data ti nwọle ni ọna kika JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Lilo Spark SQL, a ṣe akojọpọ ti o rọrun ati ṣafihan abajade ninu console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Gbigba ọrọ ibeere ati ṣiṣiṣẹ nipasẹ Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Ati lẹhinna a fipamọ data akojọpọ Abajade sinu tabili ni AWS RDS. Lati ṣafipamọ awọn abajade akojọpọ si tabili data, a yoo lo ọna kikọ ti ohun elo DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Awọn ọrọ diẹ nipa siseto asopọ si AWS RDS. A ṣẹda olumulo ati ọrọ igbaniwọle fun ni igbesẹ “Fifiranṣẹ AWS PostgreSQL”. O yẹ ki o lo Endpoint bi url olupin data data, eyiti o han ni Asopọmọra & apakan aabo:

Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Lati le sopọ mọ Spark ati Kafka ni deede, o yẹ ki o ṣiṣẹ iṣẹ naa nipasẹ smark-fi silẹ nipa lilo ohun-ọṣọ naa. sipaki-sisanwọle-kafka-0-8_2.11. Ni afikun, a yoo tun lo ohun artifact fun ibaraenisepo pẹlu aaye data PostgreSQL; a yoo gbe wọn nipasẹ --packages.

Fun irọrun ti iwe afọwọkọ, a yoo tun pẹlu bi awọn igbewọle igbewọle orukọ olupin ifiranṣẹ ati koko lati eyiti a fẹ gba data.

Nitorinaa, o to akoko lati ṣe ifilọlẹ ati ṣayẹwo iṣẹ ṣiṣe ti eto naa:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Ohun gbogbo ti ṣiṣẹ jade! Gẹgẹbi o ti le rii ninu aworan ni isalẹ, lakoko ti ohun elo n ṣiṣẹ, awọn abajade ikojọpọ tuntun yoo jade ni gbogbo iṣẹju-aaya 2, nitori a ṣeto aarin aarin si awọn aaya meji nigbati a ṣẹda ohun elo StreamingContext:

Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

Nigbamii ti, a ṣe ibeere ti o rọrun si ibi ipamọ data lati ṣayẹwo niwaju awọn igbasilẹ ninu tabili idunadura_flow:

Apache Kafka ati Ṣiṣẹda data ṣiṣanwọle pẹlu ṣiṣan Spark

ipari

Nkan yii wo apẹẹrẹ ti sisẹ ṣiṣan ti alaye nipa lilo Spark Streaming ni apapo pẹlu Apache Kafka ati PostgreSQL. Pẹlu idagba ti data lati awọn orisun oriṣiriṣi, o ṣoro lati ṣe iwọn iye to wulo ti Spark Streaming fun ṣiṣẹda ṣiṣanwọle ati awọn ohun elo akoko gidi.

O le wa koodu orisun ni kikun ni ibi ipamọ mi ni GitHub.

Inu mi dun lati jiroro lori nkan yii, Mo nireti awọn asọye rẹ, ati pe Mo nireti tun fun ibawi ti o munadoko lati ọdọ gbogbo awọn oluka abojuto.

Mo fẹ o aseyori!

Orin Dafidi. Ni ibẹrẹ o ti gbero lati lo aaye data PostgreSQL agbegbe, ṣugbọn fun ifẹ mi fun AWS, Mo pinnu lati gbe data data si awọsanma. Ninu nkan ti o tẹle lori koko yii, Emi yoo ṣafihan bi o ṣe le ṣe gbogbo eto ti a ṣalaye loke ni AWS nipa lilo AWS Kinesis ati AWS EMR. Tẹle awọn iroyin!

orisun: www.habr.com

Fi ọrọìwòye kun