Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Сайн уу, Хабр! Өнөөдөр бид Apache Kafka мессежийн урсгалыг Spark Streaming ашиглан боловсруулж, боловсруулалтын үр дүнг AWS RDS үүлэн мэдээллийн санд бичих системийг бүтээх болно.

Тодорхой нэг зээлийн байгууллага бидэнд ирж буй гүйлгээг бүх салбараараа "шууд" боловсруулах үүрэг даалгавар өгдөг гэж төсөөлөөд үз дээ. Үүнийг төрийн сангийн нээлттэй валютын байрлал, гүйлгээний хязгаар эсвэл санхүүгийн үр дүнг цаг алдалгүй тооцоолох зорилгоор хийж болно.

Энэ хэргийг ид шид, ид шидийн ид шид хэрэглэхгүйгээр хэрхэн хэрэгжүүлэх вэ - зүсэлтийн доор уншина уу! Яв!

Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах
(Зургийн эх сурвалж)

Танилцуулга

Мэдээжийн хэрэг, их хэмжээний өгөгдлийг бодит цаг хугацаанд боловсруулах нь орчин үеийн системд ашиглах өргөн боломжийг олгодог. Үүний хамгийн алдартай хослолуудын нэг нь Apache Kafka болон Spark Streaming-ийн тандем бөгөөд Кафка нь ирж буй мессежийн пакетуудын урсгалыг үүсгэдэг бөгөөд Spark Streaming нь эдгээр пакетуудыг өгөгдсөн хугацааны интервалаар боловсруулдаг.

Програмын алдааг тэсвэрлэх чадварыг нэмэгдүүлэхийн тулд бид хяналтын цэгүүдийг ашиглана. Энэхүү механизмын тусламжтайгаар Spark Streaming хөдөлгүүр алдагдсан өгөгдлийг сэргээх шаардлагатай үед зөвхөн сүүлчийн хяналтын цэг рүү буцаж очоод, тэндээс тооцоогоо үргэлжлүүлэх хэрэгтэй.

Боловсруулсан системийн архитектур

Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Ашигласан бүрэлдэхүүн хэсгүүд:

  • Apache Kafka түгээсэн нийтлэх-захиалах мессежийн систем юм. Офлайн болон онлайн мессеж хэрэглэхэд тохиромжтой. Өгөгдлийн алдагдлаас урьдчилан сэргийлэхийн тулд Кафка мессежийг дискэнд хадгалж, кластер дотор хуулбарладаг. Кафка систем нь ZooKeeper синхрончлолын үйлчилгээний дээр суурилагдсан;
  • Apache Spark Streaming - Урсгалын өгөгдлийг боловсруулах Spark бүрэлдэхүүн хэсэг. Spark Streaming модуль нь бичил багц архитектурыг ашиглан бүтээгдсэн бөгөөд өгөгдлийн урсгалыг жижиг өгөгдлийн багцуудын тасралтгүй дараалал гэж тайлбарладаг. Spark Streaming нь өөр өөр эх сурвалжаас өгөгдлийг авч жижиг багц болгон нэгтгэдэг. Шинэ багцуудыг тодорхой давтамжтайгаар үүсгэдэг. Хугацааны интервал бүрийн эхэнд шинэ пакет үүсгэгдэх бөгөөд энэ хугацаанд хүлээн авсан өгөгдөл нь багцад багтана. Интервалын төгсгөлд пакетийн өсөлт зогсдог. Интервалын хэмжээ нь багцын интервал гэж нэрлэгддэг параметрээр тодорхойлогддог;
  • Apache Spark SQL - харилцааны боловсруулалтыг Spark функциональ програмчлалтай хослуулсан. Бүтэцлэгдсэн өгөгдөл гэдэг нь схем бүхий өгөгдөл, өөрөөр хэлбэл бүх бичлэгийн нэг багц талбарыг хэлнэ. Spark SQL нь янз бүрийн бүтэцтэй өгөгдлийн эх сурвалжаас оролтыг дэмждэг бөгөөд схемийн мэдээлэл байгаагийн ачаар зөвхөн шаардлагатай бичлэгийн талбаруудыг үр ашигтайгаар татаж авахаас гадна DataFrame API-уудаар хангадаг;
  • AWS RDS нь харьцангуй хямд үүлэнд суурилсан харилцааны мэдээллийн сан, тохиргоо, ажиллагаа, масштабыг хялбаршуулдаг вэб үйлчилгээ бөгөөд Amazon шууд удирддаг.

Кафка серверийг суулгаж ажиллуулж байна

Кафкаг шууд ашиглахаасаа өмнө Java байгаа эсэхийг шалгах хэрэгтэй, учир нь... JVM нь ажилд ашиглагддаг:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Кафкатай ажиллах шинэ хэрэглэгч үүсгэцгээе:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Дараа нь Apache Kafka албан ёсны вэбсайтаас түгээлтийг татаж аваарай.

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Татаж авсан архивыг задлах:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Дараагийн алхам нь сонголттой. Үнэн хэрэгтээ анхдагч тохиргоо нь Apache Kafka-ийн бүх чадварыг бүрэн ашиглах боломжийг олгодоггүй. Жишээлбэл, мессеж нийтлэх боломжтой сэдэв, ангилал, бүлгийг устгана уу. Үүнийг өөрчлөхийн тулд тохиргооны файлыг засъя:

vim ~/kafka/config/server.properties

Файлын төгсгөлд дараахь зүйлийг нэмнэ үү.

delete.topic.enable = true

Кафка серверийг эхлүүлэхийн өмнө ZooKeeper серверийг эхлүүлэх шаардлагатай бөгөөд бид Кафка түгээлттэй хамт ирдэг туслах скриптийг ашиглах болно:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper амжилттай ажиллаж эхэлсний дараа Кафка серверийг тусдаа терминал дээр ажиллуулна уу:

bin/kafka-server-start.sh config/server.properties

Гүйлгээ гэдэг шинэ сэдвийг үүсгэцгээе:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Шаардлагатай тооны хуваалт, хуулбар бүхий сэдвийг үүсгэсэн эсэхийг шалгацгаая:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Шинээр бий болсон сэдвийн хувьд үйлдвэрлэгч, хэрэглэгч хоёрыг турших мөчүүдийг алдцгаая. Та мессеж илгээх, хүлээн авахыг хэрхэн шалгах талаар дэлгэрэнгүй мэдээллийг албан ёсны баримт бичигт бичсэн болно - Зарим мессеж илгээнэ үү. За, бид KafkaProducer API ашиглан Python хэл дээр продюсер бичихээр явж байна.

Продюсер бичих

Үйлдвэрлэгч санамсаргүй өгөгдөл үүсгэх болно - секунд тутамд 100 мессеж. Санамсаргүй өгөгдөл гэж бид гурван талбараас бүрдсэн толь бичгийг хэлнэ.

  • салбар - зээлийн байгууллагын борлуулалтын цэгийн нэр;
  • Валютын - гүйлгээний валют;
  • хэмжээ - гүйлгээний дүн. Хэрэв Банкнаас валют худалдаж авсан бол эерэг тоо, худалдах бол сөрөг тоо байна.

Үйлдвэрлэгчийн код дараах байдалтай байна.

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Дараа нь илгээх аргыг ашиглан бид сервер рүү JSON форматаар хэрэгтэй сэдвээр мессеж илгээдэг.

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Скриптийг ажиллуулах үед бид терминал дээр дараах мессежийг хүлээн авдаг.

Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Энэ нь бүх зүйл бидний хүссэнээр ажилладаг гэсэн үг юм - продюсер нь бидэнд хэрэгтэй сэдвээр мессеж үүсгэж, илгээдэг.
Дараагийн алхам бол Spark-г суулгаж, энэ мессежийн урсгалыг боловсруулах явдал юм.

Apache Spark суулгаж байна

Apache Spark нь бүх нийтийн, өндөр хүчин чадалтай кластер тооцоолох платформ юм.

Spark нь MapReduce загварын түгээмэл хэрэгжүүлэлтээс илүү сайн гүйцэтгэлтэй бөгөөд интерактив асуулга, урсгал боловсруулах зэрэг өргөн хүрээний тооцооллын төрлийг дэмждэг. Хурд нь их хэмжээний өгөгдлийг боловсруулахад чухал үүрэг гүйцэтгэдэг, учир нь энэ нь танд хэдэн минут, цаг хүлээхгүйгээр интерактив ажиллах боломжийг олгодог. Spark-ийг маш хурдан болгодог хамгийн том давуу талуудын нэг нь санах ойн тооцоолол хийх чадвар юм.

Энэ хүрээ нь Scala дээр бичигдсэн тул та эхлээд суулгах хэрэгтэй:

sudo apt-get install scala

Spark түгээлтийг албан ёсны вэбсайтаас татаж авах:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Архивыг задлах:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Spark-ийн замыг bash файлд нэмнэ үү:

vim ~/.bashrc

Засварлагчаар дамжуулан дараах мөрүүдийг нэмнэ үү.

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrc-д өөрчлөлт оруулсны дараа доорх командыг ажиллуулна уу:

source ~/.bashrc

AWS PostgreSQL-г байрлуулж байна

Зөвхөн урсгалаас боловсруулсан мэдээллийг байршуулах мэдээллийн баазыг байрлуулах л үлдлээ. Үүний тулд бид AWS RDS үйлчилгээг ашиглах болно.

AWS консол -> AWS RDS -> Өгөгдлийн сан -> Өгөгдлийн сан үүсгэх рүү очно уу:
Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

PostgreSQL-г сонгоод Next дарна уу:
Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Учир нь Энэ жишээ нь зөвхөн боловсролын зорилгоор зориулагдсан бөгөөд бид "хамгийн багадаа" үнэгүй сервер ашиглах болно (Үнэгүй шат):
Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Дараа нь бид Free Tier блок дээр тэмдэг тавьж, үүний дараа бидэнд t2.micro ангийн жишээг автоматаар санал болгох болно - сул боловч энэ нь үнэ төлбөргүй бөгөөд бидний даалгаварт маш тохиромжтой.
Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Дараа нь маш чухал зүйлүүд гарч ирнэ: өгөгдлийн сангийн жишээний нэр, мастер хэрэглэгчийн нэр, түүний нууц үг. Жишээг нэрлэе: myHabrTest, мастер хэрэглэгч: хабр, нууц үг: habr12345 Дараа нь товчийг дарна уу:
Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Дараагийн хуудсан дээр манай өгөгдлийн сангийн серверийн гаднаас хандах хандалт (Нийтийн хүртээмж) болон портын хүртээмжийг хариуцдаг параметрүүд байна.

Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

VPC аюулгүй байдлын бүлэгт зориулж 5432 (PostgreSQL) портоор дамжуулан манай мэдээллийн сангийн серверт гаднаас хандах боломжийг олгох шинэ тохиргоог хийцгээе.
AWS консол руу тусдаа хөтчийн цонхноос VPC хяналтын самбар -> Аюулгүй байдлын бүлгүүд -> Аюулгүй байдлын бүлэг үүсгэх хэсэг рүү очъё:
Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Бид Аюулгүй байдлын бүлгийн нэрийг тохируулсан - PostgreSQL, тайлбар, энэ бүлэг ямар VPC-тэй холбогдохыг зааж өгөөд Үүсгэх товчийг дарна уу:
Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Доорх зурагт үзүүлсэн шиг шинээр үүсгэсэн бүлгийн 5432 портын Inbound дүрмийг бөглөнө үү. Та портыг гараар зааж өгөх боломжгүй, харин Төрөл унадаг жагсаалтаас PostgreSQL-г сонгоно уу.

Хатуухан хэлэхэд ::/0 утга нь дэлхийн өнцөг булан бүрээс сервер рүү ирж буй траффикийн бэлэн байдлыг илэрхийлдэг бөгөөд энэ нь каноникийн хувьд бүрэн үнэн биш боловч жишээнд дүн шинжилгээ хийхийн тулд энэ аргыг ашиглахыг зөвшөөрнө үү:
Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Бид хөтчийн хуудас руу буцаж, "Нарийвчилсан тохиргоог тохируулах" хэсгийг нээж, VPC аюулгүй байдлын бүлгүүдийг сонгох -> Одоо байгаа VPC аюулгүй байдлын бүлгийг сонгох -> PostgreSQL:
Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Дараа нь Өгөгдлийн сангийн сонголтууд -> Өгөгдлийн сангийн нэр -> нэрийг тохируулах - habrDB.

Анхдагчаар нөөцлөлт (нөөц хадгалах хугацаа - 0 хоног), хяналт, гүйцэтгэлийн ойлголтыг идэвхгүй болгохоос бусад тохиолдолд бид үлдсэн параметрүүдийг орхиж болно. Товчлуур дээр дарна уу Мэдээллийн сан үүсгэх:
Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Утас зохицуулагч

Эцсийн шат нь Кафкагаас хоёр секунд тутамд ирж буй шинэ өгөгдлийг боловсруулж, үр дүнг мэдээллийн санд оруулах Spark ажлыг боловсруулах явдал юм.

Дээр дурдсанчлан хяналтын цэгүүд нь SparkStreaming-ийн үндсэн механизм бөгөөд алдааг тэсвэрлэх чадвартай байхын тулд тохируулагдсан байх ёстой. Бид хяналтын цэгүүдийг ашиглах бөгөөд хэрэв процедур амжилтгүй болбол Spark Streaming модуль нь алдагдсан өгөгдлийг сэргээхийн тулд зөвхөн сүүлчийн хяналтын цэг рүү буцаж очоод тооцооллыг үргэлжлүүлэх шаардлагатай болно.

Гэмтэлд тэсвэртэй, найдвартай файлын систем (HDFS, S3 гэх мэт) дээр хяналтын цэгийн мэдээллийг хадгалах лавлахыг тохируулснаар шалгах цэгийг идэвхжүүлж болно. Үүнийг жишээ нь ашиглан хийдэг:

streamingContext.checkpoint(checkpointDirectory)

Бидний жишээн дээр бид дараах аргыг ашиглах болно, тухайлбал, хэрэв checkpointDirectory байгаа бол контекстийг хяналтын цэгийн өгөгдлөөс дахин үүсгэх болно. Хэрэв лавлах байхгүй бол (жишээ нь, анх удаа ажиллуулж байгаа бол) шинэ контекст үүсгэж, DStreams-ийг тохируулахын тулд functionToCreateContext-ийг дуудна:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Бид KafkaUtils номын сангийн createDirectStream аргыг ашиглан "гүйлгээ" сэдэвт холбогдох DirectStream объектыг үүсгэдэг.

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

JSON форматаар ирж буй өгөгдлийг задлан шинжлэх:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL ашиглан бид энгийн бүлэглэл хийж, үр дүнг консол дээр харуулна.

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Асуулгын текстийг авч, Spark SQL-ээр дамжуулан ажиллуулж байна:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Дараа нь бид нэгтгэсэн өгөгдлийг AWS RDS дахь хүснэгтэд хадгалдаг. Нэгтгэсэн үр дүнг мэдээллийн сангийн хүснэгтэд хадгалахын тулд бид DataFrame объектын бичих аргыг ашиглана.

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS-тэй холбогдох талаар хэдэн үг хэлье. Бид "AWS PostgreSQL-г байршуулах" алхам дээр хэрэглэгчийн болон нууц үгийг үүсгэсэн. Та Endpoint-г өгөгдлийн сангийн серверийн URL болгон ашиглах ёстой бөгөөд энэ нь Холболт ба аюулгүй байдлын хэсэгт харагдаж байна.

Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Spark болон Кафка хоёрыг зөв холбохын тулд та олдворыг ашиглан smark-submit-ээр ажлыг гүйцэтгэх ёстой. spark-streaming-kafka-0-8_2.11. Нэмж дурдахад бид PostgreSQL өгөгдлийн сантай харилцахын тулд олдворыг ашиглах болно; бид тэдгээрийг --packages-ээр дамжуулах болно.

Скриптийг уян хатан болгохын тулд бид мессежийн серверийн нэр болон өгөгдөл хүлээн авах сэдвийг оруулах параметрүүдийг оруулах болно.

Тиймээс системийн ажиллагааг эхлүүлэх, шалгах цаг болжээ.

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Бүх зүйл бүтсэн! Доорх зурган дээрээс харахад програм ажиллаж байх үед бид StreamingContext объектыг үүсгэхдээ багцын интервалыг 2 секунд болгон тохируулсан тул 2 секунд тутамд шинэ нэгтгэлийн үр дүн гарч ирдэг.

Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

Дараа нь бид хүснэгтэд байгаа бичлэг байгаа эсэхийг шалгахын тулд мэдээллийн санд энгийн асуулга хийдэг гүйлгээний_урсгал:

Apache Kafka болон Spark Streaming-тэй дамжуулалтын өгөгдөл боловсруулах

дүгнэлт

Энэ нийтлэлд Apache Kafka болон PostgreSQL-тэй хамтран Spark Streaming ашиглан мэдээллийн урсгал боловсруулах жишээг авч үзсэн. Төрөл бүрийн эх сурвалжаас авсан өгөгдлийн өсөлттэй холбоотойгоор урсгал болон бодит цагийн хэрэглээг бий болгох Spark Streaming-ийн практик үнэ цэнийг хэт үнэлэхэд хэцүү байдаг.

Та бүрэн эх кодыг миний репозитороос олох боломжтой GitHub.

Би энэ нийтлэлийг хэлэлцэж байгаадаа баяртай байна, би таны сэтгэгдлийг тэсэн ядан хүлээж байна, мөн бүх халамжтай уншигчдаас бүтээлч шүүмжлэл хүлээж байна.

Чамд амжилт хүсье!

Др. Эхэндээ орон нутгийн PostgreSQL мэдээллийн санг ашиглахаар төлөвлөж байсан ч AWS-д дуртай байсан тул би мэдээллийн санг үүл рүү шилжүүлэхээр шийдсэн. Энэ сэдвээрх дараагийн өгүүллээр би AWS Kinesis болон AWS EMR ашиглан дээр дурдсан системийг бүхэлд нь AWS дээр хэрхэн хэрэгжүүлэхийг харуулах болно. Мэдээг дагаж мөрдөөрэй!

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх