ProHoster > Блог > басқарма > Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу
Apache Кафка және Spark Streaming көмегімен ағынды деректерді өңдеу
Сәлем, Хабр! Бүгін біз Spark Streaming көмегімен Apache Kafka хабарлама ағындарын өңдейтін және өңдеу нәтижелерін AWS RDS бұлттық дерекқорына жазатын жүйені құрастырамыз.
Белгілі бір несиелік мекеме бізге өзінің барлық филиалдары бойынша кіріс транзакцияларын «ұшу кезінде» өңдеу міндетін қояды деп елестетіп көрейік. Бұл қазынашылық үшін ашық валюталық позицияны, операциялар бойынша лимиттерді немесе қаржылық нәтижелерді жедел есептеу мақсатында жасалуы мүмкін.
Бұл істі сиқырлы және сиқырлы заклинанияларды қолданбай қалай жүзеге асыруға болады - кесу астында оқыңыз! Бар!
Әрине, нақты уақыт режимінде деректердің үлкен көлемін өңдеу заманауи жүйелерде пайдалану үшін кең мүмкіндіктер береді. Бұл үшін ең танымал комбинациялардың бірі - Apache Kafka және Spark Streaming тандемі, мұнда Кафка кіріс хабарлама пакеттерінің ағынын жасайды, ал Spark Streaming бұл пакеттерді берілген уақыт интервалында өңдейді.
Қолданбаның ақауларға төзімділігін арттыру үшін біз бақылау нүктелерін қолданамыз. Бұл механизмнің көмегімен Spark Streaming қозғалтқышы жоғалған деректерді қалпына келтіру қажет болғанда, ол тек соңғы бақылау нүктесіне оралып, сол жерден есептеулерді жалғастыруы керек.
Әзірленген жүйенің архитектурасы
Қолданылатын компоненттер:
Apache Kafka таратылған жариялау-жазылу хабар алмасу жүйесі болып табылады. Офлайн және онлайн хабарламаларды тұтыну үшін қолайлы. Деректердің жоғалуын болдырмау үшін Кафка хабарламалары дискіде сақталады және кластер ішінде қайталанады. Кафка жүйесі ZooKeeper синхрондау қызметінің үстіне құрастырылған;
Apache Spark ағыны - Ағындық деректерді өңдеуге арналған Spark компоненті. Spark Streaming модулі деректер ағыны шағын деректер пакеттерінің үздіксіз тізбегі ретінде түсіндірілетін микро пакеттік архитектураны қолдану арқылы құрастырылған. Spark Streaming әртүрлі көздерден деректерді алады және оларды шағын пакеттерге біріктіреді. Жаңа пакеттер тұрақты аралықпен жасалады. Әрбір уақыт аралығының басында жаңа пакет жасалады және осы аралықта алынған кез келген деректер пакетке қосылады. Аралықтың соңында пакеттің өсуі тоқтайды. Аралықтың өлшемі пакеттік интервал деп аталатын параметрмен анықталады;
Apache Spark SQL - реляциялық өңдеуді Spark функционалды бағдарламалауымен біріктіреді. Құрылымдық деректер схемасы бар деректерді білдіреді, яғни барлық жазбалар үшін өрістердің жалғыз жиыны. Spark SQL әр түрлі құрылымдық деректер көздерінен енгізуді қолдайды және схемалық ақпараттың қолжетімділігі арқасында ол жазбалардың қажетті өрістерін ғана тиімді шығарып алады, сонымен қатар DataFrame API интерфейстерін қамтамасыз етеді;
AWS RDS салыстырмалы түрде арзан бұлтқа негізделген реляциялық дерекқор, орнатуды, пайдалануды және масштабтауды жеңілдететін және Amazon арқылы тікелей басқарылатын веб-қызмет.
Кафка серверін орнату және іске қосу
Кафканы тікелей қолданбас бұрын сізде Java бар екеніне көз жеткізу керек, себебі... JVM жұмыс үшін пайдаланылады:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Келесі қадам міндетті емес. Мәселе мынада, әдепкі параметрлер Apache Kafka-ның барлық мүмкіндіктерін толығымен пайдалануға мүмкіндік бермейді. Мысалы, хабарлар жарияланатын тақырыпты, санатты, топты жойыңыз. Мұны өзгерту үшін конфигурация файлын өңдейік:
vim ~/kafka/config/server.properties
Файлдың соңына келесіні қосыңыз:
delete.topic.enable = true
Кафка серверін іске қоспас бұрын ZooKeeper серверін іске қосу керек; біз Kafka таратуымен бірге келетін көмекші сценарийді қолданамыз:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper сәтті іске қосылғаннан кейін, Кафка серверін бөлек терминалда іске қосыңыз:
Жаңадан құрылған тақырып бойынша өндіруші мен тұтынушыны сынау сәттерін жіберіп алайық. Хабарламаларды жіберу және алуды қалай тексеруге болатыны туралы толығырақ ақпарат ресми құжаттамада жазылған - Кейбір хабарламалар жіберіңіз. Біз KafkaProducer API арқылы Python тілінде продюсер жазуға көшеміз.
Продюсер жазуы
Өндіруші кездейсоқ деректерді жасайды - секунд сайын 100 хабарлама. Кездейсоқ деректер деп үш өрістен тұратын сөздікті айтамыз:
филиал — несие мекемесінің сауда нүктесінің атауы;
валюта — мәміле валютасы;
сомасы — транзакция сомасы. Банк валютаны сатып алған жағдайда сома оң сан, ал сату болса теріс сан болады.
Әрі қарай, жіберу әдісін қолдана отырып, серверге JSON пішімінде қажетті тақырыпқа хабарлама жібереміз:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Скриптті іске қосқан кезде біз терминалда келесі хабарламаларды аламыз:
Бұл бәрі біз қалағандай жұмыс істейді дегенді білдіреді - продюсер бізге қажет тақырыпқа хабарламалар жасайды және жібереді.
Келесі қадам - Spark орнату және осы хабар ағынын өңдеу.
Apache Spark орнату
Apache Spark әмбебап және өнімділігі жоғары кластерлік есептеу платформасы болып табылады.
Spark интерактивті сұраулар мен ағынды өңдеуді қоса алғанда, есептеу түрлерінің кең ауқымын қолдай отырып, MapReduce үлгісінің танымал іске асыруларына қарағанда жақсырақ жұмыс істейді. Үлкен көлемдегі деректерді өңдеу кезінде жылдамдық маңызды рөл атқарады, өйткені бұл минуттар мен сағаттарды күтпей интерактивті жұмыс істеуге мүмкіндік беретін жылдамдық. Spark-тың оны жылдам ететін ең үлкен күштерінің бірі - оның жадтағы есептеулерді орындау қабілеті.
Бұл құрылым Scala тілінде жазылған, сондықтан алдымен оны орнату керек:
sudo apt-get install scala
Spark дистрибутивін ресми веб-сайттан жүктеп алыңыз:
PostgreSQL таңдаңыз және «Келесі» түймесін басыңыз:
Өйткені Бұл мысал тек білім беру мақсаттарына арналған; біз «кем дегенде» тегін серверді қолданамыз (Тегін деңгей):
Әрі қарай, біз Free Tier блогына құсбелгі қоямыз, содан кейін бізге автоматты түрде t2.micro класының данасы ұсынылады - әлсіз болса да, ол тегін және біздің тапсырмамызға өте қолайлы:
Әрі қарай өте маңызды нәрселер келеді: дерекқор данасы аты, негізгі пайдаланушының аты және оның құпия сөзі. Данаға ат берейік: myHabrTest, негізгі пайдаланушы: хабр, құпия сөз: habr12345 және «Келесі» түймесін басыңыз:
Келесі бетте дерекқор серверінің сырттан қол жетімділігіне жауап беретін параметрлер бар (жалпыға қолжетімділік) және порттың қолжетімділігі:
5432 (PostgreSQL) порты арқылы дерекқор серверіне сыртқы қол жеткізуге мүмкіндік беретін VPC қауіпсіздік тобы үшін жаңа параметрді жасайық.
AWS консоліне бөлек браузер терезесінде VPC бақылау тақтасы -> Қауіпсіздік топтары -> Қауіпсіздік тобын жасау бөліміне өтейік:
Біз Қауіпсіздік тобының атауын орнаттық - PostgreSQL, сипаттама, бұл топтың қандай VPC-мен байланыстырылуы керектігін көрсетіңіз және Жасау түймесін басыңыз:
Төмендегі суретте көрсетілгендей жаңадан жасалған топ үшін 5432 портының кіріс ережелерін толтырыңыз. Портты қолмен көрсете алмайсыз, бірақ Түрі ашылмалы тізімінен PostgreSQL таңдаңыз.
Қатаң айтқанда, ::/0 мәні серверге әлемнің түкпір-түкпірінен келетін кіріс трафигінің қолжетімділігін білдіреді, бұл канондық түрде мүлдем дұрыс емес, бірақ мысалды талдау үшін осы тәсілді қолдануға рұқсат етейік:
Біз шолғыш бетіне ораламыз, онда бізде «Қосымша параметрлерді конфигурациялау» ашылады және VPC қауіпсіздік топтары бөлімінен -> Бар VPC қауіпсіздік топтарын таңдау -> PostgreSQL бөлімін таңдаңыз:
Әрі қарай, Дерекқор опцияларында -> Дерекқор атауы -> атауды орнатыңыз - habrDB.
Қалған параметрлерді әдепкі бойынша сақтық көшірмені (сақтық көшірмені сақтау мерзімі - 0 күн), мониторинг пен Performance Insights өшіруді қоспағанда қалдыра аламыз. Түймені басыңыз Мәліметтер базасын құру:
Жіп өңдеушісі
Соңғы кезең әр екі секунд сайын Кафкадан келетін жаңа деректерді өңдейтін және нәтижені дерекқорға енгізетін Spark тапсырмасын әзірлеу болады.
Жоғарыда айтылғандай, бақылау нүктелері SparkStreaming жүйесіндегі негізгі механизм болып табылады, ол ақауларға төзімділікті қамтамасыз ету үшін конфигурациялануы керек. Біз бақылау нүктелерін қолданамыз және егер процедура сәтсіз болса, Spark Streaming модулі жоғалған деректерді қалпына келтіру үшін соңғы бақылау нүктесіне оралып, одан есептеулерді жалғастыруы керек.
Бақылау нүктесінің ақпараты сақталатын қатеге төзімді, сенімді файлдық жүйеде (мысалы, HDFS, S3 және т.б.) каталог орнату арқылы тексеру нүктесін қосуға болады. Бұл, мысалы:
streamingContext.checkpoint(checkpointDirectory)
Біздің мысалда біз келесі тәсілді қолданамыз, атап айтқанда, егер checkpointDirectory бар болса, контекст бақылау нүктесі деректерінен қайта жасалады. Егер каталог жоқ болса (яғни бірінші рет орындалса), онда functionToCreateContext жаңа контекст жасау және DStreams конфигурациялау үшін шақырылады:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
KafkaUtils кітапханасының createDirectStream әдісі арқылы «транзакция» тақырыбына қосылу үшін DirectStream нысанын жасаймыз:
Spark SQL көмегімен біз қарапайым топтастыруды орындаймыз және нәтижені консольде көрсетеміз:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Сұрау мәтінін алу және оны Spark SQL арқылы іске қосу:
Содан кейін алынған жинақталған деректерді AWS RDS ішіндегі кестеге сақтаймыз. Агрегация нәтижелерін дерекқор кестесіне сақтау үшін DataFrame нысанының жазу әдісін қолданамыз:
AWS RDS қосылымын орнату туралы бірнеше сөз. Біз ол үшін пайдаланушы мен құпия сөзді «AWS PostgreSQL қолдану» қадамында жасадық. Байланыс және қауіпсіздік бөлімінде көрсетілетін дерекқор серверінің URL мекенжайы ретінде соңғы нүктені пайдалану керек:
Spark пен Кафканы дұрыс қосу үшін жұмысты артефакт арқылы smark-submit арқылы орындау керек. spark-streaming-kafka-0-8_2.11. Сонымен қатар, біз PostgreSQL дерекқорымен өзара әрекеттесу үшін артефакт қолданамыз; біз оларды --packages арқылы тасымалдаймыз.
Сценарийдің икемділігі үшін біз кіріс параметрлері ретінде хабар серверінің атын және деректерді алғымыз келетін тақырыпты қосамыз.
Сонымен, жүйенің функционалдығын іске қосу және тексеру уақыты келді:
Барлығы орындалды! Төмендегі суретте көріп отырғаныңыздай, қолданба жұмыс істеп тұрған кезде, жаңа жинақтау нәтижелері әрбір 2 секунд сайын шығарылады, өйткені біз StreamingContext нысанын жасаған кезде пакеттік аралықты 2 секундқа орнаттық:
Содан кейін кестеде жазбалардың бар-жоғын тексеру үшін мәліметтер базасына қарапайым сұраныс жасаймыз транзакция_ағыны:
қорытынды
Бұл мақала Apache Kafka және PostgreSQL-пен бірге Spark Streaming көмегімен ақпаратты ағынмен өңдеу мысалын қарастырды. Әртүрлі көздерден алынған деректердің өсуімен ағынды және нақты уақыттағы қосымшаларды жасау үшін Spark Streaming практикалық мәнін асыра бағалау қиын.
Толық бастапқы кодты менің репозиторийден таба аласыз GitHub.
Мен осы мақаланы талқылауға қуаныштымын, сіздердің пікірлеріңізді күтемін, сонымен қатар барлық қамқор оқырмандардан сындарлы сын күтемін.
Сәттілік тілеймін!
Пс. Бастапқыда жергілікті PostgreSQL дерекқорын пайдалану жоспарланған болатын, бірақ AWS-ге деген сүйіспеншілігімді ескере отырып, мен дерекқорды бұлтқа көшіруді шештім. Осы тақырып бойынша келесі мақалада мен AWS Kinesis және AWS EMR көмегімен жоғарыда сипатталған бүкіл жүйені AWS жүйесінде қалай енгізу керектігін көрсетемін. Жаңалықты бақылаңыз!