ProHoster > Блог > Gudanarwa > Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming
Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming
Hello, Habr! A yau za mu gina tsarin da zai aiwatar da rafukan saƙon Apache Kafka ta amfani da Spark Streaming da rubuta sakamakon sarrafawa zuwa bayanan girgije na AWS RDS.
Bari mu yi tunanin cewa wata cibiyar bashi ta kafa mana aikin sarrafa ma'amaloli masu shigowa "a kan tashi" a duk rassanta. Ana iya yin wannan don manufar ƙididdige matsayi na buɗaɗɗen kuɗi don baitulmali, iyaka ko sakamakon kuɗi don ma'amaloli, da sauransu.
Yadda za a aiwatar da wannan shari'ar ba tare da yin amfani da sihiri da sihiri ba - karanta a ƙarƙashin yanke! Tafi!
Tabbas, sarrafa babban adadin bayanai a ainihin lokacin yana ba da damammaki masu yawa don amfani a cikin tsarin zamani. Ɗaya daga cikin shahararrun haɗuwa don wannan shine tandem na Apache Kafka da Spark Streaming, inda Kafka ke ƙirƙirar rafi na fakitin saƙo mai shigowa, kuma Spark Streaming yana aiwatar da waɗannan fakitin a wani ɗan lokaci.
Don ƙara haƙurin kuskuren aikace-aikacen, za mu yi amfani da wuraren bincike. Tare da wannan tsarin, lokacin da injin Spark Streaming ke buƙatar dawo da bayanan da suka ɓace, kawai yana buƙatar komawa wurin bincike na ƙarshe kuma ya ci gaba da lissafin daga can.
Gine-gine na tsarin ci gaba
Abubuwan da aka yi amfani da su:
Apache Kafka tsarin saƙon biyan kuɗi ne da aka rarraba. Ya dace da amfani da saƙon layi da kan layi. Don hana asarar bayanai, ana adana saƙonnin Kafka akan faifai kuma ana maimaita su a cikin gungu. An gina tsarin Kafka a saman sabis ɗin aiki tare na ZooKeeper;
Apache Spark Streaming - Bangaren walƙiya don sarrafa bayanan yawo. An gina tsarin Spark Streaming ta hanyar amfani da tsarin gine-ginen micro-batch, inda aka fassara rafin bayanan azaman ci gaba da jerin ƙananan fakitin bayanai. Spark Streaming yana ɗaukar bayanai daga tushe daban-daban kuma yana haɗa shi cikin ƙananan fakiti. Ana ƙirƙira sabbin fakiti a lokaci-lokaci. A farkon kowane tazara, ana ƙirƙira sabon fakiti, kuma duk bayanan da aka karɓa yayin wannan tazarar ana haɗa su cikin fakitin. A ƙarshen tazara, haɓaka fakiti yana tsayawa. An ƙayyade girman tazarar ta hanyar siga da ake kira tazarar batch;
Apache Spark SQL - ya haɗu da aiki na alaƙa tare da shirye-shiryen aikin Spark. Tsarin bayanai yana nufin bayanan da ke da tsari, wato, saitin filaye guda ɗaya don duk bayanan. Spark SQL yana goyan bayan shigarwa daga tushen bayanan da aka tsara daban-daban kuma, godiya ga samuwar bayanan tsari, yana iya dawo da fa'idodin bayanan da ake buƙata kawai, kuma yana samar da DataFrame APIs;
Farashin AWS RDS madaidaitan bayanai ne na tushen girgije mai rahusa, sabis na yanar gizo wanda ke sauƙaƙe saiti, aiki da ƙima, kuma Amazon ne ke sarrafa shi kai tsaye.
Shigarwa da gudanar da uwar garken Kafka
Kafin amfani da Kafka kai tsaye, kuna buƙatar tabbatar da cewa kuna da Java, saboda ... Ana amfani da JVM don aiki:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Mataki na gaba na zaɓi ne. Gaskiyar ita ce saitunan tsoho ba su ba ka damar cikakken amfani da duk fasalulluka na Apache Kafka ba. Misali, share wani batu, rukuni, rukuni wanda za'a iya buga saƙonni zuwa gare shi. Don canza wannan, bari mu gyara fayil ɗin sanyi:
vim ~/kafka/config/server.properties
Ƙara abubuwan zuwa ƙarshen fayil ɗin:
delete.topic.enable = true
Kafin fara sabar Kafka, kuna buƙatar fara sabar ZooKeeper; za mu yi amfani da rubutun taimako wanda ya zo tare da rarraba Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
Bayan ZooKeeper ya fara nasara, ƙaddamar da uwar garken Kafka a cikin tasha ta daban:
Bari mu rasa lokutan gwada furodusa da mabukaci don sabon batun da aka ƙirƙira. An rubuta ƙarin cikakkun bayanai game da yadda zaku iya gwada aikawa da karɓar saƙonni a cikin takaddun hukuma - Aika wasu saƙonni. To, mun ci gaba zuwa rubuta furodusa a Python ta amfani da API na KafkaProducer.
Rubutun furodusa
Furodusa zai samar da bayanan bazuwar - saƙonni 100 kowane daƙiƙa. Ta hanyar bazuwar bayanai muna nufin ƙamus wanda ya ƙunshi fage uku:
Branch - sunan wurin siyar da ma'aikata bashi;
Kudin - kudin mu'amala;
Adadin - adadin ma'amala. Adadin zai zama tabbataccen lamba idan siyan kuɗi ne da Banki, kuma lambar mara kyau idan siyarwa ce.
Bayan haka, ta amfani da hanyar aikawa, muna aika sako zuwa uwar garken, zuwa batun da muke bukata, a cikin tsarin JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
Lokacin gudanar da rubutun, muna karɓar saƙonni masu zuwa a cikin tashar:
Wannan yana nufin cewa komai yana aiki kamar yadda muke so - furodusa yana haifar da aika saƙonni zuwa batun da muke buƙata.
Mataki na gaba shine shigar da Spark da sarrafa wannan rafi na saƙo.
Sanya Apache Spark
Apache Spark dandamali ne na duniya kuma yana da babban aiki.
Spark yana aiki mafi kyau fiye da sanannen aiwatarwa na ƙirar MapReduce yayin da yake tallafawa nau'ikan ƙididdiga daban-daban, gami da tambayoyin mu'amala da sarrafa rafi. Gudun yana taka muhimmiyar rawa wajen sarrafa bayanai masu yawa, tun da yake gudun ne ke ba ku damar yin aiki tare ba tare da kashe mintuna ko sa'o'i ba. Ɗaya daga cikin manyan ƙarfin Spark da ke sa shi sauri shine ikon yin lissafin ƙwaƙwalwar ajiya.
An rubuta wannan tsarin a cikin Scala, don haka kuna buƙatar shigar da shi da farko:
sudo apt-get install scala
Zazzage rarrabawar Spark daga gidan yanar gizon hukuma:
Gudun umarnin da ke ƙasa bayan yin canje-canje zuwa bashrc:
source ~/.bashrc
Ana tura AWS PostgreSQL
Abin da ya rage shi ne shigar da bayanan da za mu loda bayanan da aka sarrafa daga rafi. Don wannan za mu yi amfani da sabis na AWS RDS.
Je zuwa na'ura wasan bidiyo na AWS -> AWS RDS -> Databases -> Ƙirƙiri bayanai:
Zaɓi PostgreSQL kuma danna Next:
Domin Wannan misalin don dalilai na ilimi ne kawai; za mu yi amfani da uwar garken kyauta "aƙalla" (Tier Kyauta):
Bayan haka, mun sanya kaska a cikin Free Tier block, kuma bayan haka za a ba mu misali ta atomatik ajin t2.micro - ko da yake yana da rauni, yana da kyauta kuma ya dace da aikinmu:
Na gaba abubuwa masu mahimmanci su zo: sunan misali misali, sunan babban mai amfani da kalmar sirri. Bari mu ambaci misalin: myHabrTest, babban mai amfani: habr, kalmar sirri: shafi 12345 kuma danna maɓallin Next:
A shafi na gaba akwai sigogi da ke da alhakin isar da sabar bayanan mu daga waje (Samun damar Jama'a) da wadatar tashar jiragen ruwa:
Bari mu ƙirƙiri sabon saiti don ƙungiyar tsaro ta VPC, wanda zai ba da damar shiga waje zuwa uwar garken bayanan mu ta tashar jiragen ruwa 5432 (PostgreSQL).
Bari mu je zuwa na'urar bidiyo ta AWS a cikin wani taga mai bincike daban zuwa Dashboard VPC -> Ƙungiyoyin Tsaro -> Ƙirƙiri sashin rukunin tsaro:
Mun saita sunan don ƙungiyar Tsaro - PostgreSQL, bayanin, nuna wanne VPC wannan rukunin yakamata a haɗa dashi kuma danna maɓallin Ƙirƙiri:
Cika dokokin Inbound don tashar jiragen ruwa 5432 don sabuwar ƙungiyar da aka ƙirƙira, kamar yadda aka nuna a hoton da ke ƙasa. Ba za ku iya ƙididdige tashar jiragen ruwa da hannu ba, amma zaɓi PostgreSQL daga Nau'in zaɓuka.
A taƙaice magana, ƙimar ::/0 tana nufin samuwar zirga-zirgar ababen hawa zuwa sabar daga ko'ina cikin duniya, wanda ba gaskiya bane gaba ɗaya, amma don nazarin misalin, bari mu ƙyale kanmu muyi amfani da wannan hanyar:
Mun koma shafin mai bincike, inda muke da “Sanya saitunan ci gaba” buɗe kuma zaɓi a cikin sashin ƙungiyoyin tsaro na VPC -> Zaɓi ƙungiyoyin tsaro na VPC na yanzu -> PostgreSQL:
Na gaba, a cikin Zaɓuɓɓukan Database -> Sunan Database -> saita sunan - habrDB.
Za mu iya barin sauran sigogi, ban da musaki madadin (lokacin riƙewa - 0 days), saka idanu da Halayen Ayyuka, ta tsohuwa. Danna maɓallin Databaseirƙiri bayanai:
Mai sarrafa zaren
Mataki na ƙarshe shine haɓaka aikin Spark, wanda zai aiwatar da sabbin bayanan da ke fitowa daga Kafka kowane daƙiƙa biyu kuma shigar da sakamakon a cikin bayanan.
Kamar yadda aka ambata a sama, wuraren bincike sune ainihin tsarin a cikin SparkStreaming wanda dole ne a daidaita shi don tabbatar da haƙurin kuskure. Za mu yi amfani da wuraren bincike kuma, idan tsarin ya gaza, tsarin Spark Streaming zai buƙaci komawa wurin bincike na ƙarshe kuma ya ci gaba da ƙididdigewa daga gare ta don dawo da bayanan da suka ɓace.
Ana iya kunna wuraren bincike ta hanyar saita kundin adireshi akan tsarin fayil mai jurewa, abin dogaro (kamar HDFS, S3, da sauransu) inda za'a adana bayanan wurin bincike. Ana yin wannan ta amfani da, misali:
streamingContext.checkpoint(checkpointDirectory)
A cikin misalinmu, za mu yi amfani da wannan hanya mai zuwa, wato, idan checkpointDirectory ya wanzu, to za a sake ƙirƙirar mahallin daga bayanan binciken. Idan shugabanci bai wanzu (watau an kashe shi a karon farko), to ana kiran aikinToCreateContext don ƙirƙirar sabon mahallin kuma saita DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Mun ƙirƙiri wani abu na DirectStream don haɗawa zuwa batun "ma'amala" ta amfani da hanyar ƙirƙirarDirectStream na ɗakin karatu na KafkaUtils:
Yin amfani da Spark SQL, muna yin rukuni mai sauƙi kuma muna nuna sakamakon a cikin na'ura wasan bidiyo:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Samun rubutun tambaya da gudanar da shi ta Spark SQL:
Sannan muna adana bayanan da aka tattara sakamakonsu cikin tebur a cikin AWS RDS. Don ajiye sakamakon tarawa zuwa tebur na bayanai, za mu yi amfani da hanyar rubuta abun DataFrame:
Kalmomi kaɗan game da saita haɗi zuwa AWS RDS. Mun ƙirƙiri mai amfani da kalmar sirri don shi a matakin “Tsarin AWS PostgreSQL”. Ya kamata ku yi amfani da Ƙarshen Ƙarshen azaman url uwar garken bayanai, wanda aka nuna a sashin Haɗin kai & tsaro:
Don haɗa Spark da Kafka daidai, yakamata ku gudanar da aikin ta hanyar smark-submit ta amfani da kayan tarihi. walƙiya-streaming-kafka-0-8_2.11. Bugu da ƙari, za mu kuma yi amfani da kayan tarihi don yin hulɗa tare da bayanan PostgreSQL; za mu canza su ta hanyar - fakiti.
Don sassauƙar rubutun, za mu kuma haɗa a matsayin sigogin shigarwa sunan uwar garken saƙon da batun da muke son karɓar bayanai daga gare ta.
Don haka, lokaci ya yi da za a ƙaddamar da duba ayyukan tsarin:
Komai yayi aiki! Kamar yadda kuke gani a hoton da ke ƙasa, yayin da aikace-aikacen ke gudana, ana fitar da sabbin sakamakon tarawa kowane sakan 2, saboda mun saita tazarar batching zuwa daƙiƙa 2 lokacin da muka ƙirƙiri abin StreamingContext:
Na gaba, muna yin tambaya mai sauƙi ga ma'ajin bayanai don bincika kasancewar bayanan a cikin tebur ma'amala_gudanarwa:
ƙarshe
Wannan labarin ya kalli misali na sarrafa bayanan rafi ta amfani da Spark Streaming a haɗe tare da Apache Kafka da PostgreSQL. Tare da haɓakar bayanai daga maɓuɓɓuka daban-daban, yana da wahala a ƙididdige ƙimar aiki na Spark Streaming don ƙirƙirar yawo da aikace-aikacen ainihin lokaci.
Kuna iya samun cikakken lambar tushe a cikin ma'ajina a GitHub.
Na yi farin cikin tattauna wannan labarin, ina sa ran yin tsokaci, kuma ina fatan za a yi mana kyakkyawar suka daga dukkan masu karatu masu kulawa.
Ina yi muku fatan nasara!
Zab. Da farko an shirya yin amfani da bayanan PostgreSQL na gida, amma saboda ƙaunata ga AWS, na yanke shawarar matsar da bayanan zuwa gajimare. A cikin labarin na gaba game da wannan batu, zan nuna yadda ake aiwatar da duk tsarin da aka bayyana a sama a cikin AWS ta amfani da AWS Kinesis da AWS EMR. Bi labarai!