Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Салам, Хабр! Бүгүн биз Spark Streaming аркылуу Apache Kafka билдирүүлөрүнүн агымдарын иштете турган жана иштетүү натыйжаларын AWS RDS булут базасына жаза турган системаны курабыз.

Келгиле, белгилүү бир кредиттик мекеме бизге өзүнүн бардык филиалдары боюнча келип түшкөн транзакцияларды "учуп баратканда" иштетүү милдетин койгонун элестетип көрөлү. Бул казыналык үчүн ачык валюта позициясын, лимиттерди же бүтүмдөрдүн финансылык натыйжаларын тез арада эсептөө максатында жасалышы мүмкүн, ж.б.

Бул ишти сыйкырдуу жана сыйкырдуу дубаларды колдонбостон кантип ишке ашыруу керек - кесип астында оку! Go!

Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү
(Сүрөт булагы)

тааныштыруу

Албетте, реалдуу убакыт режиминде маалыматтардын чоң көлөмүн иштетүү заманбап системаларда колдонууга кеңири мүмкүнчүлүктөрдү берет. Бул үчүн эң популярдуу комбинациялардын бири - Apache Kafka жана Spark Streaming тандеми, мында Кафка кирүүчү билдирүү пакеттеринин агымын түзөт, ал эми Spark Streaming бул пакеттерди берилген убакыт аралыгында иштетет.

Колдонмонун каталарга чыдамдуулугун жогорулатуу үчүн биз текшерүү пункттарын колдонобуз. Бул механизмдин жардамы менен, Spark Streaming кыймылдаткычы жоголгон маалыматтарды калыбына келтириши керек болгондо, акыркы текшерүү пунктуна кайтып келип, ошол жерден эсептөөлөрдү улантуу керек.

Иштеп чыккан системанын архитектурасы

Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Колдонулган компоненттер:

  • Апачи Кафка бөлүштүрүлгөн жарыялоо-жазылуу билдирүү системасы болуп саналат. Оффлайн жана онлайн билдирүү керектөө үчүн ылайыктуу. Маалыматтардын жоголушун алдын алуу үчүн Кафка билдирүүлөрү дискте сакталат жана кластердин ичинде репликацияланат. Кафка системасы ZooKeeper синхрондоштуруу кызматынын үстүнө курулган;
  • Apache Spark Streaming - агымдык маалыматтарды иштетүү үчүн Spark компоненти. Spark Streaming модулу микро-топтомдук архитектураны колдонуу менен курулган, мында маалымат агымы чакан маалымат пакеттеринин үзгүлтүксүз ырааттуулугу катары чечмеленет. Spark Streaming ар кандай булактардан маалыматтарды алып, аларды чакан пакеттерге бириктирет. Жаңы пакеттер үзгүлтүксүз түрдө түзүлөт. Ар бир убакыт аралыгынын башында жаңы пакет түзүлөт жана ошол аралыкта алынган бардык маалыматтар пакетке киргизилет. Интервалдын аягында пакеттин өсүшү токтойт. Интервалдын өлчөмү партия аралык деп аталган параметр менен аныкталат;
  • Apache Spark SQL - Spark функционалдык программалоо менен реляциялык иштетүүнү айкалыштырат. Структураланган маалыматтар схемасы бар маалыматтарды, башкача айтканда, бардык жазуулар үчүн талаалардын бирдиктүү топтомун билдирет. Spark SQL ар кандай структураланган маалымат булактарынан киргизүүнү колдойт жана схемалык маалыматтын болушунун аркасында ал жазуулардын талап кылынган талааларын гана эффективдүү ала алат, ошондой эле DataFrame API'лерин камсыздай алат;
  • AWS RDS салыштырмалуу арзан булутка негизделген реляциялык маалымат базасы, орнотууну, иштетүүнү жана масштабдоону жөнөкөйлөтүүчү желе кызматы жана түздөн-түз Amazon тарабынан башкарылат.

Кафка серверин орнотуу жана иштетүү

Кафканы түздөн-түз колдонуудан мурун, сизде Java бар экенин текшеришиңиз керек, анткени... JVM жумуш үчүн колдонулат:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Келгиле, Кафка менен иштөө үчүн жаңы колдонуучуну түзөлү:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Андан кийин, расмий Apache Kafka веб-сайтынан бөлүштүрүүнү жүктөп алыңыз:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Жүктөлгөн архивди таңгактан чыгарыңыз:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Кийинки кадам милдеттүү эмес. Чындыгында, демейки жөндөөлөр Apache Kafkaнын бардык мүмкүнчүлүктөрүн толук колдонууга мүмкүндүк бербейт. Мисалы, билдирүүлөр жарыялана турган теманы, категорияны, топту жок кылуу. Муну өзгөртүү үчүн конфигурация файлын түзөтөлү:

vim ~/kafka/config/server.properties

Файлдын аягына төмөнкүнү кошуңуз:

delete.topic.enable = true

Кафка серверин баштоодон мурун ZooKeeper серверин ишке киргизишиңиз керек; биз Кафка бөлүштүрүүсү менен келген жардамчы скриптти колдонобуз:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper ийгиликтүү ишке киргенден кийин, Kafka серверин өзүнчө терминалда ишке киргизиңиз:

bin/kafka-server-start.sh config/server.properties

Келгиле, транзакция деп аталган жаңы тема түзөлү:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Бөлүмдөрдүн жана репликациялардын керектүү саны бар тема түзүлгөнүн текшерели:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Жаңы түзүлгөн тема үчүн продюсер менен керектөөчүнү сынаган учурларды сагыналы. Билдирүүлөрдү жөнөтүүнү жана алууну кантип сынай ала тургандыгы тууралуу көбүрөөк маалымат расмий документтерде жазылган - Кээ бир билдирүүлөрдү жөнөтүү. Ооба, биз KafkaProducer API аркылуу Pythonдо продюсер жазууга өттүк.

Продюсер жазуу

Продюсер кокус маалыматтарды жаратат - секунд сайын 100 билдирүү. Кокус маалыматтар деп үч талаадан турган сөздүктү түшүнөбүз:

  • бутак - кредиттик мекеменин сатуу пунктунун аталышы;
  • акча — операциялык валюта;
  • суммасы - транзакциянын суммасы. Сумма, эгерде ал Банк тарабынан валютаны сатып алса, оң сан, ал эми сатуу болсо терс сан болот.

Продюсердин коду төмөнкүдөй көрүнөт:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Андан кийин, жөнөтүү ыкмасын колдонуп, серверге, керектүү темага JSON форматында билдирүү жөнөтөбүз:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Скриптти иштетип жатканда, терминалда төмөнкү билдирүүлөрдү алабыз:

Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Бул баары биз каалагандай иштейт дегенди билдирет - продюсер бизге керектүү темага билдирүүлөрдү жаратат жана жөнөтөт.
Кийинки кадам - ​​Spark орнотуу жана бул билдирүү агымын иштетүү.

Apache Spark орнотулууда

Apache Spark универсалдуу жана жогорку өндүрүмдүүлүктөгү кластердик эсептөө платформасы.

Spark MapReduce моделинин популярдуу ишке ашырууларына караганда жакшыраак иштейт, ошону менен бирге эсептөө түрлөрүнүн кеңири спектрин, анын ичинде интерактивдүү сурамдарды жана агымды иштетүүнү колдойт. Ылдамдык чоң көлөмдөгү маалыматтарды иштеп чыгууда маанилүү ролду ойнойт, анткени бул ылдамдык бир нече мүнөттөрдү же сааттарды күтпөстөн интерактивдүү иштөөгө мүмкүндүк берет. Sparkтын эң чоң күчтүү жактарынын бири - бул анын эс тутумдагы эсептөөлөрдү жүргүзүү жөндөмдүүлүгү.

Бул алкак Scala тилинде жазылган, андыктан адегенде аны орнотуу керек:

sudo apt-get install scala

Spark бөлүштүрүүнү расмий веб-сайттан жүктөп алыңыз:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Архивди ачуу:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

bash файлына Spark жолун кошуңуз:

vim ~/.bashrc

Редактор аркылуу төмөнкү саптарды кошуңуз:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrcге өзгөртүүлөрдү киргизгенден кийин төмөнкү буйрукту иштетиңиз:

source ~/.bashrc

AWS PostgreSQL жайылтуу

Биз агымдардан иштетилген маалыматты жүктөй турган маалымат базасын жайгаштыруу гана калды. Бул үчүн биз AWS RDS кызматын колдонобуз.

AWS консолуна өтүңүз -> AWS RDS -> Маалыматтар базасы -> Маалыматтар базасын түзүңүз:
Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

PostgreSQLди тандап, Кийинки баскычын басыңыз:
Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Анткени Бул мисал билим берүү максаттары үчүн гана; биз бекер серверди "минималдуу" (акысыз деңгээл) колдонобуз:
Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Андан кийин, биз Free Tier блогуна белги коебуз, андан кийин бизге автоматтык түрдө t2.micro классынын мисалы сунушталат - алсыз болсо да, ал акысыз жана биздин милдетибизге абдан ылайыктуу:
Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Андан кийин абдан маанилүү нерселер: маалымат базасынын инстанциясынын аты, башкы колдонуучунун аты жана анын сырсөзү. Мисалга ат коёлу: myHabrTest, башкы колдонуучу: хабр, купуя сөз: habr12345 жана Кийинки баскычын чыкылдатыңыз:
Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Кийинки бетте биздин маалымат базасы серверибиздин сырттан жеткиликтүүлүгүнө жооптуу параметрлер бар (Коомдук жеткиликтүүлүк) жана порттун жеткиликтүүлүгү:

Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Келгиле, VPC коопсуздук тобу үчүн жаңы жөндөө түзөлү, ал 5432 (PostgreSQL) порту аркылуу маалымат базасынын серверине тышкы кирүүгө мүмкүндүк берет.
Келгиле, AWS консолуна өзүнчө серепчи терезесинде VPC панелине баралы -> Коопсуздук топтору -> Коопсуздук тобун түзүү бөлүмү:
Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Биз Коопсуздук тобунун аталышын койдук - PostgreSQL, сүрөттөмө, бул топтун кайсы VPC менен байланыштырылышы керектигин көрсөтүп, Түзүү баскычын чыкылдатыңыз:
Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Төмөнкү сүрөттө көрсөтүлгөндөй, жаңы түзүлгөн топ үчүн 5432 порту үчүн Кирүүчү эрежелерди толтуруңуз. Портту кол менен көрсөтө албайсыз, бирок Тип ылдый түшүүчү тизмеден PostgreSQLди тандаңыз.

Тактап айтканда, ::/0 мааниси дүйнөнүн бардык булуң-бурчунан серверге кирүүчү трафиктин болушун билдирет, бул канондук жактан такыр туура эмес, бирок мисалды талдоо үчүн, өзүбүзгө ушул ыкманы колдонууга уруксат берели:
Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Биз браузердин барагына кайтып келебиз, анда бизде "Өркүндөтүлгөн орнотууларды конфигурациялоо" ачылып, VPC коопсуздук топтору бөлүмүнөн тандаңыз -> Учурдагы VPC коопсуздук топторун тандоо -> PostgreSQL:
Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Андан кийин, Маалымат базасынын параметрлеринде -> Маалымат базасынын аты -> атын орнотуңуз - habrDB.

Камдык көчүрмөнү өчүрүүнү (камдык көчүрмөнү сактоо мөөнөтү - 0 күн), мониторингди жана Performance Insights, демейки боюнча өчүрүүнү кошпогондо, калган параметрлерди калтыра алабыз. баскычын басыңыз Маалыматтар базасын түзүү:
Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Жип иштетүүчү

Акыркы этап Spark жумушун иштеп чыгуу болот, ал Кафкадан эки секунд сайын келген жаңы маалыматтарды иштетип, натыйжаны маалымат базасына киргизет.

Жогоруда белгиленгендей, текшерүү пункттары SparkStreamingдин негизги механизми болуп саналат, алар катага чыдамдуулукту камсыз кылуу үчүн конфигурацияланышы керек. Биз текшерүү пункттарын колдонобуз жана эгер процедурадан майнап чыкпаса, Spark Streaming модулу жоголгон маалыматтарды калыбына келтирүү үчүн акыркы өткөрүү пунктуна кайтып келип, андан эсептөөлөрдү улантуу керек.

Текшерүү пунктунун маалыматы сактала турган катага чыдамдуу, ишенимдүү файл тутумунда (мисалы, HDFS, S3 ж.б.) каталогду орнотуу аркылуу текшерүүнү иштетүүгө болот. Бул, мисалы, колдонуу менен жүзөгө ашырылат:

streamingContext.checkpoint(checkpointDirectory)

Биздин мисалда биз төмөнкү ыкманы колдонобуз, тактап айтканда, эгерде checkpointDirectory бар болсо, анда контекст текшерүү пунктунун маалыматтарынан кайра түзүлөт. Эгерде каталог жок болсо (б.а. биринчи жолу аткарылса), анда functionToCreateContext жаңы контекстти түзүү жана DStreams конфигурациялоо үчүн чакырылат:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

KafkaUtils китепканасынын createDirectStream ыкмасын колдонуу менен "транзакция" темасына туташуу үчүн DirectStream объектин түзөбүз:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

JSON форматында келген маалыматтарды талдоо:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQLди колдонуу менен биз жөнөкөй топтоштурууну жасайбыз жана натыйжаны консолдо көрсөтөбүз:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Суроо текстин алуу жана аны Spark SQL аркылуу иштетүү:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Андан кийин биз топтолгон маалыматтарды AWS RDSдеги таблицага сактайбыз. Агрегация натыйжаларын маалымат базасынын таблицасына сактоо үчүн биз DataFrame объектинин жазуу ыкмасын колдонобуз:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS менен байланышты орнотуу жөнүндө бир нече сөз. Биз ал үчүн колдонуучуну жана сырсөздү “AWS PostgreSQLди жайылтуу” кадамында түздүк. Сиз Endpoint'ти Байланыш жана коопсуздук бөлүмүндө көрсөтүлгөн маалымат базасы серверинин url катары колдонушуңуз керек:

Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Spark менен Кафканы туура туташтыруу үчүн, артефактты колдонуп smark-submit аркылуу жумушту иштетишиңиз керек. spark-streaming-kafka-0-8_2.11. Мындан тышкары, биз PostgreSQL маалымат базасы менен иштешүү үчүн артефактты колдонобуз; биз аларды --packages аркылуу өткөрүп беребиз.

Скрипттин ийкемдүүлүгү үчүн биз киргизүү параметрлери катары билдирүү серверинин атын жана маалыматтарды алууну каалаган теманы да кошобуз.

Ошентип, системанын иштешин ишке киргизүү жана текшерүү мезгили келди:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Баары ойдогудай болду! Төмөнкү сүрөттө көрүнүп тургандай, тиркеме иштеп жатканда, жаңы топтоо натыйжалары ар 2 секунд сайын чыгарылып турат, анткени биз StreamingContext объектин түзгөндө пакеттөө аралыгын 2 секундга койдук:

Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

Андан кийин, таблицада жазуулардын бар-жоктугун текшерүү үчүн маалымат базасына жөнөкөй сурам жасайбыз транзакция_агымы:

Apache Kafka жана Streaming маалыматтарды Spark Streaming менен иштетүү

жыйынтыктоо

Бул макалада Apache Kafka жана PostgreSQL менен бирдикте Spark Streamingди колдонуу менен маалыматты агым менен иштетүүнүн мисалы каралды. Ар кандай булактардан алынган маалыматтардын өсүшү менен, агымдык жана реалдуу убакыт тиркемелерин түзүү үчүн Spark Streamingдин практикалык маанисин ашыкча баалоо кыйын.

Толук баштапкы кодду менин репозиторийимден таба аласыз GitHub.

Мен бул макаланы талкуулоого кубанычтамын, сиздин комментарийлериңизди чыдамсыздык менен күтөм, ошондой эле бардык камкор окурмандардан конструктивдүү сынды күтөм.

Ийгилик каалайм!

Заб. Башында жергиликтүү PostgreSQL маалымат базасын колдонуу пландаштырылган, бирок AWSге болгон сүйүүмдү эске алып, мен маалымат базасын булутка жылдырууну чечтим. Бул темадагы кийинки макалада мен AWS Kinesis жана AWS EMR аркылуу AWSде жогоруда сүрөттөлгөн системаны кантип ишке ашырууну көрсөтөм. Жаңылыктарга көз салыңыз!

Source: www.habr.com

Комментарий кошуу