ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming
Aloha, Habr! I kēia lā e kūkulu mākou i kahi ʻōnaehana e hoʻoponopono i nā kahawai memo Apache Kafka me ka hoʻohana ʻana i ka Spark Streaming a kākau i nā hopena hoʻoili i ka waihona kapuaʻi AWS RDS.
E noʻonoʻo kākou ua hoʻonohonoho kekahi ʻoihana hōʻaiʻē iā mākou i ka hana o ka hoʻomaʻamaʻa ʻana i nā kālepa e hiki mai ana "ma ka lele" ma kāna mau lālā āpau. Hiki ke hana i kēia no ke kumu o ka helu koke ʻana i kahi kūlana kālā hāmama no ka waihona kālā, nā palena a i ʻole nā hopena kālā no nā kālepa, etc.
Pehea e hoʻokō ai i kēia hihia me ka ʻole o ka hoʻohana ʻana i nā kilokilo a me nā kilokilo - heluhelu ma lalo o ka ʻoki! Hele!
ʻOiaʻiʻo, ʻo ka hana ʻana i ka nui o ka ʻikepili i ka manawa maoli e hāʻawi i nā manawa kūpono no ka hoʻohana ʻana i nā ʻōnaehana hou. ʻO kekahi o nā hui kaulana loa no kēia, ʻo ia ka tandem o Apache Kafka a me Spark Streaming, kahi e hana ai ʻo Kafka i kahi kahawai o nā ʻeke memo e hiki mai ana, a ʻo Spark Streaming e hana i kēia mau ʻeke i ka manawa i hāʻawi ʻia.
No ka hoʻonui ʻana i ka hewa o ka noi, e hoʻohana mākou i nā māka. Me kēia hana, i ka wā e pono ai ka ʻenekini Spark Streaming e hoʻihoʻi i ka ʻikepili nalowale, pono ia e hoʻi i ka helu hope loa a hoʻomau i nā helu mai laila.
Hoʻolālā o ka ʻōnaehana i kūkulu ʻia
Nā ʻāpana i hoʻohana ʻia:
Apache Kafka he ʻōnaehana hoʻolaha hoʻolaha hoʻolaha. He kūpono no ka hoʻohana ʻana i nā memo ma waho a me ka pūnaewele. No ka pale ʻana i ka nalowale o ka ʻikepili, mālama ʻia nā memo Kafka ma ka disk a hana hou ʻia i loko o ka pūʻulu. Kūkulu ʻia ka ʻōnaehana Kafka ma luna o ka lawelawe synchronization ZooKeeper;
ʻO Apache Spark Streaming - Māhele Spark no ka hoʻoili ʻana i ka ʻikepili kahe. Hoʻokumu ʻia ka module Spark Streaming me ka hoʻohana ʻana i kahi hoʻolālā micro-batch, kahi i unuhi ʻia ai ke kahawai ʻikepili ma ke ʻano he kaʻina mau o nā ʻeke ʻikepili liʻiliʻi. Lawe ʻo Spark Streaming i ka ʻikepili mai nā kumu like ʻole a hoʻohui ʻia i loko o nā pūʻolo liʻiliʻi. Hana ʻia nā pūʻolo hou i nā manawa maʻamau. I ka hoʻomaka ʻana o kēlā me kēia manawa, hana ʻia kahi ʻeke hou, a ua hoʻokomo ʻia nā ʻikepili i loaʻa i ia wā i loko o ka ʻeke. I ka pau ʻana o ka wā, pau ka ulu ʻana o ka ʻeke. Hoʻoholo ʻia ka nui o ka wā e kahi ʻāpana i kapa ʻia ʻo ka wā hui;
ʻO Apache Spark SQL - hoʻohui i ka hoʻoponopono pili me ka hoʻolālā hana Spark. ʻO ka ʻikepili i kūkulu ʻia, ʻo ia hoʻi ka ʻikepili i loaʻa kahi schema, ʻo ia hoʻi, hoʻokahi pūʻulu kahua no nā moʻolelo āpau. Kākoʻo ʻo Spark SQL i ka hoʻokomo ʻana mai nā kumu ʻikepili i kūkulu ʻia a, mahalo i ka loaʻa ʻana o ka ʻike schema, hiki iā ia ke kiʻi maikaʻi wale i nā kahua i koi ʻia o nā moʻolelo, a hāʻawi pū i nā API DataFrame;
AWS RDS ʻO ia kahi ʻikepili pili i ke ao, lawelawe pūnaewele e hoʻomaʻamaʻa i ka hoʻonohonoho, ka hana a me ka scaling, a lawelawe pololei ʻia e Amazon.
Ke hoʻouka a me ka holo ʻana i ke kikowaena Kafka
Ma mua o ka hoʻohana pono ʻana iā Kafka, pono ʻoe e hōʻoia iā ʻoe iā Java, no ka mea... Hoʻohana ʻia ʻo JVM no ka hana:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
ʻO ka hana aʻe he koho. ʻO ka ʻoiaʻiʻo ʻaʻole ʻae nā hoʻonohonoho paʻamau iā ʻoe e hoʻohana piha i nā mana āpau o Apache Kafka. No ka laʻana, holoi i kahi kumuhana, ʻano, hui i hiki ke paʻi ʻia nā memo. No ka hoʻololi i kēia, e hoʻoponopono i ka faila hoʻonohonoho:
vim ~/kafka/config/server.properties
E hoʻohui i kēia i ka hope o ka faila:
delete.topic.enable = true
Ma mua o ka hoʻomaka ʻana i ka server Kafka, pono ʻoe e hoʻomaka i ka server ZooKeeper; e hoʻohana mākou i ka palapala kōkua e hele mai me ka hāʻawi Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Ma hope o ka hoʻomaka maikaʻi ʻana o ZooKeeper, e hoʻomaka i ka server Kafka ma kahi kikowaena ʻokoʻa:
E nalo kākou i nā manawa o ka hoʻāʻo ʻana i ka mea hana a me ka mea kūʻai aku no ke kumuhana hou. ʻO nā kikoʻī hou aʻe e pili ana i ke ʻano e hiki ai iā ʻoe ke hoʻāʻo i ka hoʻouna ʻana a me ka loaʻa ʻana o nā leka i kākau ʻia ma ka palapala kūhelu - E hoʻouna i kekahi mau leka. ʻAe, neʻe mākou e kākau i kahi mea hana ma Python e hoʻohana ana i ka KafkaProducer API.
Mea kākau kākau
E hoʻopuka ka mea hana i ka ʻikepili maʻamau - 100 mau memo i kēlā me kēia kekona. ʻO ka ʻikepili maʻamau ke ʻōlelo nei mākou he puke wehewehe ʻōlelo ʻekolu mau kahua:
Pālā - ka inoa o ke kahua kūʻai o ka hale kūʻai;
kālā - kālā kālepa;
huina — ka nui o ka hana. He helu maikaʻi ka huina inā he kūʻai kālā ia e ka Bank, a he helu maikaʻi ʻole inā he kūʻai.
A laila, me ka hoʻohana ʻana i ke ʻano hoʻouna, hoʻouna mākou i kahi leka i ke kikowaena, i ke kumuhana a mākou e pono ai, ma JSON format:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
I ka holo ʻana i ka palapala, loaʻa iā mākou nā memo ma ka pahu:
ʻO ia ke ʻano o nā mea a pau e like me kā mākou makemake - hana ka mea hana a hoʻouna i nā leka i ke kumuhana a mākou e pono ai.
ʻO ka hana aʻe e hoʻokomo iā Spark a hana i kēia kahawai memo.
Ke hoʻouka nei iā Apache Spark
Apache Spark he kahua hoʻopili pūʻulu hui honua a me ka hana kiʻekiʻe.
ʻOi aku ka maikaʻi o ka Spark ma mua o nā hoʻokō kaulana o ke kumu hoʻohālike ʻo MapReduce ʻoiai e kākoʻo ana i kahi ākea o nā ʻano helu helu, me nā nīnau pili a me ka hoʻoili kahawai. He kuleana koʻikoʻi ka wikiwiki i ka wā e hoʻoili ai i ka nui o ka ʻikepili, ʻoiai ʻo ia ka wikiwiki e hiki ai iā ʻoe ke hana me ka ʻole o ka hoʻolilo ʻana i mau minuke a i ʻole mau hola e kali ana. ʻO kekahi o ka ikaika nui o Spark e wikiwiki loa ai, ʻo ia ka hiki ke hana i nā helu hoʻomanaʻo.
Ua kākau ʻia kēia ʻano hana ma Scala, no laila pono ʻoe e hoʻokomo iā ia ma mua:
sudo apt-get install scala
Hoʻoiho i ka hāʻawi Spark mai ka pūnaewele official:
E holo i ke kauoha ma lalo nei ma hope o ka hoʻololi ʻana i ka bashrc:
source ~/.bashrc
Ke hoʻohana nei i ka AWS PostgreSQL
ʻO nā mea a pau i koe, ʻo ia ke kau ʻana i ka waihona ʻikepili kahi e hoʻouka ai mākou i ka ʻike i hoʻoponopono ʻia mai nā kahawai. No kēia e hoʻohana mākou i ka lawelawe AWS RDS.
E hele i ka console AWS -> AWS RDS -> Nā ʻikepili -> Hana i ka waihona:
E koho i ka PostgreSQL a kaomi Next:
No ka mea No nā kumu hoʻonaʻauao wale nō kēia laʻana; e hoʻohana mākou i kahi kikowaena manuahi "ma ka liʻiliʻi loa" (Free Tier):
Ma hope aʻe, kau mākou i kahi kiko i ka poloka Free Tier, a ma hope o ia e hāʻawi ʻia mākou i kahi laʻana o ka papa t2.micro - ʻoiai nāwaliwali, manuahi a kūpono loa no kā mākou hana:
E hele mai ana nā mea nui loa: ka inoa o ka waihona waihona, ka inoa o ka haku mea hoʻohana a me kāna ʻōlelo huna. E inoa i ka laʻana: myHabrTest, haku mea hoʻohana: habr, hua huna: habr12345 a kaomi ma ka pihi Next:
Ma ka ʻaoʻao aʻe, aia nā ʻāpana e pili ana i ka hiki ʻana o kā mākou kikowaena waihona mai waho (Ka hiki i ka lehulehu) a me ka loaʻa ʻana o ke awa:
E hana mākou i kahi hoʻonohonoho hou no ka hui palekana VPC, kahi e hiki ai ke komo i waho i kā mākou kikowaena waihona ma o ke awa 5432 (PostgreSQL).
E hele kāua i ka console AWS ma kahi puka aniani ʻokoʻa i ka VPC Dashboard -> Pūʻulu Palekana -> Hana i ka ʻāpana pūʻulu palekana:
Hoʻonoho mākou i ka inoa no ka pūʻulu Security - PostgreSQL, kahi wehewehe, e hōʻike i ka VPC e pili ai kēia pūʻulu a kaomi i ka pihi Create:
E hoʻopiha i nā lula Inbound no ka awa 5432 no ka hui hou i hana ʻia, e like me ka mea i hōʻike ʻia ma ke kiʻi ma lalo nei. ʻAʻole hiki iā ʻoe ke kuhikuhi i ke awa me ka lima, akā koho i ka PostgreSQL mai ka Type drop-down list.
ʻO ka ʻōlelo koʻikoʻi, ʻo ka waiwai ::/0 ʻo ia ka loaʻa ʻana o nā kaʻa e hiki mai ana i ka server mai nā wahi a pau o ka honua, ʻaʻole maoli maoli ka canonically, akā no ka nānā ʻana i ka laʻana, e ʻae mākou iā mākou iho e hoʻohana i kēia ala:
Hoʻi mākou i ka ʻaoʻao polokalamu kele pūnaewele, kahi i wehe ʻia ai "Configure advanced settings" a koho i ka ʻāpana o nā hui palekana VPC -> E koho i nā pūʻulu palekana VPC i loaʻa -> PostgreSQL:
A laila, i nā koho Database -> inoa waihona -> hoʻonoho i ka inoa - habrDB.
Hiki iā mākou ke waiho i nā ʻāpana i koe, koe wale nō ka hoʻopau ʻana i ka hoʻihoʻi ʻana (manawa mālama paʻa - 0 mau lā), ka nānā ʻana a me ka Performance Insights, ma ka paʻamau. Kaomi ma ke pihi Hana i ka hōkeo ʻikepili:
Mea hoʻopaʻa kaula
ʻO ka pae hope loa, ʻo ia ka hoʻomohala ʻana i kahi hana Spark, kahi e hoʻoponopono ai i nā ʻikepili hou mai Kafka i kēlā me kēia ʻelua kekona a hoʻokomo i ka hopena i loko o ka waihona.
E like me ka mea i hōʻike ʻia ma luna nei, ʻo nā mākaʻi he ʻano kumu nui i SparkStreaming e pono e hoʻonohonoho ʻia e hōʻoia i ka hoʻomanawanui hewa. E hoʻohana mākou i nā wahi hōʻoia a, inā hāʻule ke kaʻina hana, pono wale ka Spark Streaming module e hoʻi i ka helu hope loa a hoʻomau i ka helu ʻana mai ia mea e hoʻihoʻi i ka ʻikepili nalowale.
Hiki ke hoʻohana ʻia ka mākaʻikaʻi ma ka hoʻonohonoho ʻana i kahi papa kuhikuhi ma kahi ʻōnaehana file hiki ke ʻae i ka hewa (e like me HDFS, S3, etc.) kahi e mālama ʻia ai ka ʻike kikoʻī. Hana ʻia kēia me ka hoʻohana ʻana, no ka laʻana:
streamingContext.checkpoint(checkpointDirectory)
Ma kā mākou laʻana, e hoʻohana mākou i kēia ala, ʻo ia hoʻi, inā loaʻa ka checkpointDirectory, a laila e hana hou ʻia ka pōʻaiapili mai ka ʻike kikoʻī. Inā ʻaʻole i loaʻa ka papa kuhikuhi (ʻo ia hoʻi ka hoʻokō ʻia no ka manawa mua), a laila kāhea ʻia ka functionToCreateContext e hana i kahi pōʻaiapili hou a hoʻonohonoho i nā DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Hana mākou i kahi mea DirectStream e hoʻopili ai i ke kumuhana "transaction" me ka hoʻohana ʻana i ke ʻano createDirectStream o ka waihona KafkaUtils:
Ke hoʻohana nei iā Spark SQL, hana mākou i kahi hui maʻalahi a hōʻike i ka hopena i ka console:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Ke kiʻi ʻana i ka kikokikona nīnau a me ka holo ʻana ma o Spark SQL:
A laila mālama mākou i ka ʻikepili i hōʻuluʻulu ʻia i loko o kahi papa ma AWS RDS. No ka mālama ʻana i nā hopena hōʻuluʻulu i kahi papa ʻikepili, e hoʻohana mākou i ke ʻano kākau o ka mea DataFrame:
He mau huaʻōlelo e pili ana i ka hoʻonohonoho ʻana i kahi pilina i AWS RDS. Ua hana mākou i ka mea hoʻohana a me ka ʻōlelo huna no ia ma ke kaʻina "Deploying AWS PostgreSQL". Pono e hoʻohana ʻia ka Endpoint ma ke ʻano he url kikowaena waihona, i hōʻike ʻia ma ka ʻāpana Connectivity & Security:
I mea e hoʻopili pono ai iā Spark a me Kafka, pono ʻoe e holo i ka hana ma o smark-submit me ka hoʻohana ʻana i ka artifact. spark-streaming-kafka-0-8_2.11. Eia hou, e hoʻohana pū mākou i kahi artifact no ka launa pū ʻana me ka waihona PostgreSQL; e hoʻoili mākou iā lākou ma o --packages.
No ka maʻalahi o ka palapala, e hoʻokomo pū mākou i ka inoa o ke kikowaena memo a me ke kumuhana a mākou e makemake ai e loaʻa ka ʻikepili.
No laila, ʻo ka manawa kēia e hoʻomaka a nānā i ka hana o ka ʻōnaehana:
Ua holo pono nā mea a pau! E like me kāu e ʻike ai ma ke kiʻi ma lalo nei, ʻoiai e holo ana ka noi, hoʻopuka ʻia nā hopena hōʻuluʻulu hou i kēlā me kēia 2 kekona, no ka mea, ua hoʻonohonoho mākou i ka wā hoʻopaʻa i 2 kekona i ka wā i hana ai mākou i ka mea StreamingContext:
A laila, hana mākou i kahi nīnau maʻalahi i ka waihona e nānā i ka noho ʻana o nā moʻolelo ma ka papa transaction_flow:
hopena
Ua nānā kēia ʻatikala i kahi hiʻohiʻona o ke kahe ʻana o ka ʻike e hoʻohana ana i ka Spark Streaming i hui pū me Apache Kafka a me PostgreSQL. Me ka ulu ʻana o ka ʻikepili mai nā kumu like ʻole, paʻakikī ke hoʻonui i ka waiwai kūpono o Spark Streaming no ka hoʻokumu ʻana i nā noi streaming a me ka manawa maoli.
Hiki iā ʻoe ke loaʻa ka code kumu piha i kaʻu waihona ma GitHub.
Hauʻoli wau i ke kūkākūkā ʻana i kēia ʻatikala, ke kakali nei au i kāu mau manaʻo, a ke manaʻolana nei hoʻi au i nā ʻōlelo hoʻohewa maikaʻi mai nā mea heluhelu aloha a pau.
Makemake wau e holomua ʻoe!
PS. I ka hoʻomaka ʻana ua hoʻolālā ʻia e hoʻohana i kahi waihona PostgreSQL kūloko, akā hāʻawi ʻia i koʻu aloha no AWS, ua hoʻoholo wau e hoʻoneʻe i ka waihona i ke ao. Ma ka ʻatikala e pili ana i kēia kumuhana, e hōʻike wau pehea e hoʻokō ai i ka ʻōnaehana holoʻokoʻa i hōʻike ʻia ma luna nei ma AWS me ka hoʻohana ʻana iā AWS Kinesis a me AWS EMR. E hahai i ka nūhou!