Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Сәлем, Хабр! Бүгін біз Spark Streaming көмегімен Apache Kafka хабарлама ағындарын өңдейтін және өңдеу нәтижелерін AWS RDS бұлттық дерекқорына жазатын жүйені құрастырамыз.

Белгілі бір несиелік мекеме бізге өзінің барлық филиалдары бойынша кіріс транзакцияларын «ұшу кезінде» өңдеу міндетін қояды деп елестетіп көрейік. Бұл қазынашылық үшін ашық валюталық позицияны, операциялар бойынша лимиттерді немесе қаржылық нәтижелерді жедел есептеу мақсатында жасалуы мүмкін.

Бұл істі сиқырлы және сиқырлы заклинанияларды қолданбай қалай жүзеге асыруға болады - кесу астында оқыңыз! Бар!

Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу
(Сурет көзі)

Кіріспе

Әрине, нақты уақыт режимінде деректердің үлкен көлемін өңдеу заманауи жүйелерде пайдалану үшін кең мүмкіндіктер береді. Бұл үшін ең танымал комбинациялардың бірі - Apache Kafka және Spark Streaming тандемі, мұнда Кафка кіріс хабарлама пакеттерінің ағынын жасайды, ал Spark Streaming бұл пакеттерді берілген уақыт интервалында өңдейді.

Қолданбаның ақауларға төзімділігін арттыру үшін біз бақылау нүктелерін қолданамыз. Бұл механизмнің көмегімен Spark Streaming қозғалтқышы жоғалған деректерді қалпына келтіру қажет болғанда, ол тек соңғы бақылау нүктесіне оралып, сол жерден есептеулерді жалғастыруы керек.

Әзірленген жүйенің архитектурасы

Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Қолданылатын компоненттер:

  • Apache Kafka таратылған жариялау-жазылу хабар алмасу жүйесі болып табылады. Офлайн және онлайн хабарламаларды тұтыну үшін қолайлы. Деректердің жоғалуын болдырмау үшін Кафка хабарламалары дискіде сақталады және кластер ішінде қайталанады. Кафка жүйесі ZooKeeper синхрондау қызметінің үстіне құрастырылған;
  • Apache Spark ағыны - Ағындық деректерді өңдеуге арналған Spark компоненті. Spark Streaming модулі деректер ағыны шағын деректер пакеттерінің үздіксіз тізбегі ретінде түсіндірілетін микро пакеттік архитектураны қолдану арқылы құрастырылған. Spark Streaming әртүрлі көздерден деректерді алады және оларды шағын пакеттерге біріктіреді. Жаңа пакеттер тұрақты аралықпен жасалады. Әрбір уақыт аралығының басында жаңа пакет жасалады және осы аралықта алынған кез келген деректер пакетке қосылады. Аралықтың соңында пакеттің өсуі тоқтайды. Аралықтың өлшемі пакеттік интервал деп аталатын параметрмен анықталады;
  • Apache Spark SQL - реляциялық өңдеуді Spark функционалды бағдарламалауымен біріктіреді. Құрылымдық деректер схемасы бар деректерді білдіреді, яғни барлық жазбалар үшін өрістердің жалғыз жиыны. Spark SQL әр түрлі құрылымдық деректер көздерінен енгізуді қолдайды және схемалық ақпараттың қолжетімділігі арқасында ол жазбалардың қажетті өрістерін ғана тиімді шығарып алады, сонымен қатар DataFrame API интерфейстерін қамтамасыз етеді;
  • AWS RDS салыстырмалы түрде арзан бұлтқа негізделген реляциялық дерекқор, орнатуды, пайдалануды және масштабтауды жеңілдететін және Amazon арқылы тікелей басқарылатын веб-қызмет.

Кафка серверін орнату және іске қосу

Кафканы тікелей қолданбас бұрын сізде Java бар екеніне көз жеткізу керек, себебі... JVM жұмыс үшін пайдаланылады:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Кафкамен жұмыс істеу үшін жаңа пайдаланушыны жасайық:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Содан кейін таратуды Apache Kafka ресми веб-сайтынан жүктеп алыңыз:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Жүктелген мұрағатты орау:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Келесі қадам міндетті емес. Мәселе мынада, әдепкі параметрлер Apache Kafka-ның барлық мүмкіндіктерін толығымен пайдалануға мүмкіндік бермейді. Мысалы, хабарлар жарияланатын тақырыпты, санатты, топты жойыңыз. Мұны өзгерту үшін конфигурация файлын өңдейік:

vim ~/kafka/config/server.properties

Файлдың соңына келесіні қосыңыз:

delete.topic.enable = true

Кафка серверін іске қоспас бұрын ZooKeeper серверін іске қосу керек; біз Kafka таратуымен бірге келетін көмекші сценарийді қолданамыз:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper сәтті іске қосылғаннан кейін, Кафка серверін бөлек терминалда іске қосыңыз:

bin/kafka-server-start.sh config/server.properties

Транзакция деп аталатын жаңа тақырып құрайық:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Бөлімдердің қажетті саны мен репликасы бар тақырып жасалғанына көз жеткізіңіз:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Жаңадан құрылған тақырып бойынша өндіруші мен тұтынушыны сынау сәттерін жіберіп алайық. Хабарламаларды жіберу және алуды қалай тексеруге болатыны туралы толығырақ ақпарат ресми құжаттамада жазылған - Кейбір хабарламалар жіберіңіз. Біз KafkaProducer API арқылы Python тілінде продюсер жазуға көшеміз.

Продюсер жазуы

Өндіруші кездейсоқ деректерді жасайды - секунд сайын 100 хабарлама. Кездейсоқ деректер деп үш өрістен тұратын сөздікті айтамыз:

  • филиал — несие мекемесінің сауда нүктесінің атауы;
  • валюта — мәміле валютасы;
  • сомасы — транзакция сомасы. Банк валютаны сатып алған жағдайда сома оң сан, ал сату болса теріс сан болады.

Өндірушінің коды келесідей көрінеді:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Әрі қарай, жіберу әдісін қолдана отырып, серверге JSON пішімінде қажетті тақырыпқа хабарлама жібереміз:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Скриптті іске қосқан кезде біз терминалда келесі хабарламаларды аламыз:

Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Бұл бәрі біз қалағандай жұмыс істейді дегенді білдіреді - продюсер бізге қажет тақырыпқа хабарламалар жасайды және жібереді.
Келесі қадам - ​​Spark орнату және осы хабар ағынын өңдеу.

Apache Spark орнату

Apache Spark әмбебап және өнімділігі жоғары кластерлік есептеу платформасы болып табылады.

Spark интерактивті сұраулар мен ағынды өңдеуді қоса алғанда, есептеу түрлерінің кең ауқымын қолдай отырып, MapReduce үлгісінің танымал іске асыруларына қарағанда жақсырақ жұмыс істейді. Үлкен көлемдегі деректерді өңдеу кезінде жылдамдық маңызды рөл атқарады, өйткені бұл минуттар мен сағаттарды күтпей интерактивті жұмыс істеуге мүмкіндік беретін жылдамдық. Spark-тың оны жылдам ететін ең үлкен күштерінің бірі - оның жадтағы есептеулерді орындау қабілеті.

Бұл құрылым Scala тілінде жазылған, сондықтан алдымен оны орнату керек:

sudo apt-get install scala

Spark дистрибутивін ресми веб-сайттан жүктеп алыңыз:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Мұрағатты ашыңыз:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

bash файлына Spark жолын қосыңыз:

vim ~/.bashrc

Редактор арқылы келесі жолдарды қосыңыз:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrc өзгертулерін енгізгеннен кейін төмендегі пәрменді іске қосыңыз:

source ~/.bashrc

AWS PostgreSQL қолдану

Біз ағындардан өңделген ақпаратты жүктеп салатын дерекқорды орналастыру ғана қалады. Ол үшін AWS RDS қызметін қолданамыз.

AWS консоліне өтіңіз -> AWS RDS -> Дерекқорлар -> Деректер базасын жасаңыз:
Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

PostgreSQL таңдаңыз және «Келесі» түймесін басыңыз:
Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Өйткені Бұл мысал тек білім беру мақсаттарына арналған; біз «кем дегенде» тегін серверді қолданамыз (Тегін деңгей):
Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Әрі қарай, біз Free Tier блогына құсбелгі қоямыз, содан кейін бізге автоматты түрде t2.micro класының данасы ұсынылады - әлсіз болса да, ол тегін және біздің тапсырмамызға өте қолайлы:
Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Әрі қарай өте маңызды нәрселер келеді: дерекқор данасы аты, негізгі пайдаланушының аты және оның құпия сөзі. Данаға ат берейік: myHabrTest, негізгі пайдаланушы: хабр, құпия сөз: habr12345 және «Келесі» түймесін басыңыз:
Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Келесі бетте дерекқор серверінің сырттан қол жетімділігіне жауап беретін параметрлер бар (жалпыға қолжетімділік) және порттың қолжетімділігі:

Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

5432 (PostgreSQL) порты арқылы дерекқор серверіне сыртқы қол жеткізуге мүмкіндік беретін VPC қауіпсіздік тобы үшін жаңа параметрді жасайық.
AWS консоліне бөлек браузер терезесінде VPC бақылау тақтасы -> Қауіпсіздік топтары -> Қауіпсіздік тобын жасау бөліміне өтейік:
Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Біз Қауіпсіздік тобының атауын орнаттық - PostgreSQL, сипаттама, бұл топтың қандай VPC-мен байланыстырылуы керектігін көрсетіңіз және Жасау түймесін басыңыз:
Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Төмендегі суретте көрсетілгендей жаңадан жасалған топ үшін 5432 портының кіріс ережелерін толтырыңыз. Портты қолмен көрсете алмайсыз, бірақ Түрі ашылмалы тізімінен PostgreSQL таңдаңыз.

Қатаң айтқанда, ::/0 мәні серверге әлемнің түкпір-түкпірінен келетін кіріс трафигінің қолжетімділігін білдіреді, бұл канондық түрде мүлдем дұрыс емес, бірақ мысалды талдау үшін осы тәсілді қолдануға рұқсат етейік:
Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Біз шолғыш бетіне ораламыз, онда бізде «Қосымша параметрлерді конфигурациялау» ашылады және VPC қауіпсіздік топтары бөлімінен -> Бар VPC қауіпсіздік топтарын таңдау -> PostgreSQL бөлімін таңдаңыз:
Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Әрі қарай, Дерекқор опцияларында -> Дерекқор атауы -> атауды орнатыңыз - habrDB.

Қалған параметрлерді әдепкі бойынша сақтық көшірмені (сақтық көшірмені сақтау мерзімі - 0 күн), мониторинг пен Performance Insights өшіруді қоспағанда қалдыра аламыз. Түймені басыңыз Мәліметтер базасын құру:
Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Жіп өңдеушісі

Соңғы кезең әр екі секунд сайын Кафкадан келетін жаңа деректерді өңдейтін және нәтижені дерекқорға енгізетін Spark тапсырмасын әзірлеу болады.

Жоғарыда айтылғандай, бақылау нүктелері SparkStreaming жүйесіндегі негізгі механизм болып табылады, ол ақауларға төзімділікті қамтамасыз ету үшін конфигурациялануы керек. Біз бақылау нүктелерін қолданамыз және егер процедура сәтсіз болса, Spark Streaming модулі жоғалған деректерді қалпына келтіру үшін соңғы бақылау нүктесіне оралып, одан есептеулерді жалғастыруы керек.

Бақылау нүктесінің ақпараты сақталатын қатеге төзімді, сенімді файлдық жүйеде (мысалы, HDFS, S3 және т.б.) каталог орнату арқылы тексеру нүктесін қосуға болады. Бұл, мысалы:

streamingContext.checkpoint(checkpointDirectory)

Біздің мысалда біз келесі тәсілді қолданамыз, атап айтқанда, егер checkpointDirectory бар болса, контекст бақылау нүктесі деректерінен қайта жасалады. Егер каталог жоқ болса (яғни бірінші рет орындалса), онда functionToCreateContext жаңа контекст жасау және DStreams конфигурациялау үшін шақырылады:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

KafkaUtils кітапханасының createDirectStream әдісі арқылы «транзакция» тақырыбына қосылу үшін DirectStream нысанын жасаймыз:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

JSON пішіміндегі кіріс деректерді талдау:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL көмегімен біз қарапайым топтастыруды орындаймыз және нәтижені консольде көрсетеміз:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Сұрау мәтінін алу және оны Spark SQL арқылы іске қосу:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Содан кейін алынған жинақталған деректерді AWS RDS ішіндегі кестеге сақтаймыз. Агрегация нәтижелерін дерекқор кестесіне сақтау үшін DataFrame нысанының жазу әдісін қолданамыз:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS қосылымын орнату туралы бірнеше сөз. Біз ол үшін пайдаланушы мен құпия сөзді «AWS PostgreSQL қолдану» қадамында жасадық. Байланыс және қауіпсіздік бөлімінде көрсетілетін дерекқор серверінің URL мекенжайы ретінде соңғы нүктені пайдалану керек:

Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Spark пен Кафканы дұрыс қосу үшін жұмысты артефакт арқылы smark-submit арқылы орындау керек. spark-streaming-kafka-0-8_2.11. Сонымен қатар, біз PostgreSQL дерекқорымен өзара әрекеттесу үшін артефакт қолданамыз; біз оларды --packages арқылы тасымалдаймыз.

Сценарийдің икемділігі үшін біз кіріс параметрлері ретінде хабар серверінің атын және деректерді алғымыз келетін тақырыпты қосамыз.

Сонымен, жүйенің функционалдығын іске қосу және тексеру уақыты келді:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Барлығы орындалды! Төмендегі суретте көріп отырғаныңыздай, қолданба жұмыс істеп тұрған кезде, жаңа жинақтау нәтижелері әрбір 2 секунд сайын шығарылады, өйткені біз StreamingContext нысанын жасаған кезде пакеттік аралықты 2 секундқа орнаттық:

Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

Содан кейін кестеде жазбалардың бар-жоғын тексеру үшін мәліметтер базасына қарапайым сұраныс жасаймыз транзакция_ағыны:

Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу

қорытынды

Бұл мақала Apache Kafka және PostgreSQL-пен бірге Spark Streaming көмегімен ақпаратты ағынмен өңдеу мысалын қарастырды. Әртүрлі көздерден алынған деректердің өсуімен ағынды және нақты уақыттағы қосымшаларды жасау үшін Spark Streaming практикалық мәнін асыра бағалау қиын.

Толық бастапқы кодты менің репозиторийден таба аласыз GitHub.

Мен осы мақаланы талқылауға қуаныштымын, сіздердің пікірлеріңізді күтемін, сонымен қатар барлық қамқор оқырмандардан сындарлы сын күтемін.

Сәттілік тілеймін!

Пс. Бастапқыда жергілікті PostgreSQL дерекқорын пайдалану жоспарланған болатын, бірақ AWS-ге деген сүйіспеншілігімді ескере отырып, мен дерекқорды бұлтқа көшіруді шештім. Осы тақырып бойынша келесі мақалада мен AWS Kinesis және AWS EMR көмегімен жоғарыда сипатталған бүкіл жүйені AWS жүйесінде қалай енгізу керектігін көрсетемін. Жаңалықты бақылаңыз!

Ақпарат көзі: www.habr.com

пікір қалдыру