Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Hello, Habr! A yau za mu gina tsarin da zai aiwatar da rafukan saƙon Apache Kafka ta amfani da Spark Streaming da rubuta sakamakon sarrafawa zuwa bayanan girgije na AWS RDS.

Bari mu yi tunanin cewa wata cibiyar bashi ta kafa mana aikin sarrafa ma'amaloli masu shigowa "a kan tashi" a duk rassanta. Ana iya yin wannan don manufar ƙididdige matsayi na buɗaɗɗen kuɗi don baitulmali, iyaka ko sakamakon kuɗi don ma'amaloli, da sauransu.

Yadda za a aiwatar da wannan shari'ar ba tare da yin amfani da sihiri da sihiri ba - karanta a ƙarƙashin yanke! Tafi!

Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming
(Madogaran hoto)

Gabatarwar

Tabbas, sarrafa babban adadin bayanai a ainihin lokacin yana ba da damammaki masu yawa don amfani a cikin tsarin zamani. Ɗaya daga cikin shahararrun haɗuwa don wannan shine tandem na Apache Kafka da Spark Streaming, inda Kafka ke ƙirƙirar rafi na fakitin saƙo mai shigowa, kuma Spark Streaming yana aiwatar da waɗannan fakitin a wani ɗan lokaci.

Don ƙara haƙurin kuskuren aikace-aikacen, za mu yi amfani da wuraren bincike. Tare da wannan tsarin, lokacin da injin Spark Streaming ke buƙatar dawo da bayanan da suka ɓace, kawai yana buƙatar komawa wurin bincike na ƙarshe kuma ya ci gaba da lissafin daga can.

Gine-gine na tsarin ci gaba

Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Abubuwan da aka yi amfani da su:

  • Apache Kafka tsarin saƙon biyan kuɗi ne da aka rarraba. Ya dace da amfani da saƙon layi da kan layi. Don hana asarar bayanai, ana adana saƙonnin Kafka akan faifai kuma ana maimaita su a cikin gungu. An gina tsarin Kafka a saman sabis ɗin aiki tare na ZooKeeper;
  • Apache Spark Streaming - Bangaren walƙiya don sarrafa bayanan yawo. An gina tsarin Spark Streaming ta hanyar amfani da tsarin gine-ginen micro-batch, inda aka fassara rafin bayanan azaman ci gaba da jerin ƙananan fakitin bayanai. Spark Streaming yana ɗaukar bayanai daga tushe daban-daban kuma yana haɗa shi cikin ƙananan fakiti. Ana ƙirƙira sabbin fakiti a lokaci-lokaci. A farkon kowane tazara, ana ƙirƙira sabon fakiti, kuma duk bayanan da aka karɓa yayin wannan tazarar ana haɗa su cikin fakitin. A ƙarshen tazara, haɓaka fakiti yana tsayawa. An ƙayyade girman tazarar ta hanyar siga da ake kira tazarar batch;
  • Apache Spark SQL - ya haɗu da aiki na alaƙa tare da shirye-shiryen aikin Spark. Tsarin bayanai yana nufin bayanan da ke da tsari, wato, saitin filaye guda ɗaya don duk bayanan. Spark SQL yana goyan bayan shigarwa daga tushen bayanan da aka tsara daban-daban kuma, godiya ga samuwar bayanan tsari, yana iya dawo da fa'idodin bayanan da ake buƙata kawai, kuma yana samar da DataFrame APIs;
  • Farashin AWS RDS madaidaitan bayanai ne na tushen girgije mai rahusa, sabis na yanar gizo wanda ke sauƙaƙe saiti, aiki da ƙima, kuma Amazon ne ke sarrafa shi kai tsaye.

Shigarwa da gudanar da uwar garken Kafka

Kafin amfani da Kafka kai tsaye, kuna buƙatar tabbatar da cewa kuna da Java, saboda ... Ana amfani da JVM don aiki:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Bari mu ƙirƙiri sabon mai amfani don yin aiki tare da Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Na gaba, zazzage rarraba daga gidan yanar gizon Apache Kafka na hukuma:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Cire kayan tarihin da aka sauke:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Mataki na gaba na zaɓi ne. Gaskiyar ita ce saitunan tsoho ba su ba ka damar cikakken amfani da duk fasalulluka na Apache Kafka ba. Misali, share wani batu, rukuni, rukuni wanda za'a iya buga saƙonni zuwa gare shi. Don canza wannan, bari mu gyara fayil ɗin sanyi:

vim ~/kafka/config/server.properties

Ƙara abubuwan zuwa ƙarshen fayil ɗin:

delete.topic.enable = true

Kafin fara sabar Kafka, kuna buƙatar fara sabar ZooKeeper; za mu yi amfani da rubutun taimako wanda ya zo tare da rarraba Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Bayan ZooKeeper ya fara nasara, ƙaddamar da uwar garken Kafka a cikin tasha ta daban:

bin/kafka-server-start.sh config/server.properties

Bari mu ƙirƙiri sabon batu mai suna Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Bari mu tabbatar da cewa an ƙirƙiri wani batu mai adadin da ake buƙata na ɓangarori da kwafi:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Bari mu rasa lokutan gwada furodusa da mabukaci don sabon batun da aka ƙirƙira. An rubuta ƙarin cikakkun bayanai game da yadda zaku iya gwada aikawa da karɓar saƙonni a cikin takaddun hukuma - Aika wasu saƙonni. To, mun ci gaba zuwa rubuta furodusa a Python ta amfani da API na KafkaProducer.

Rubutun furodusa

Furodusa zai samar da bayanan bazuwar - saƙonni 100 kowane daƙiƙa. Ta hanyar bazuwar bayanai muna nufin ƙamus wanda ya ƙunshi fage uku:

  • Branch - sunan wurin siyar da ma'aikata bashi;
  • Kudin - kudin mu'amala;
  • Adadin - adadin ma'amala. Adadin zai zama tabbataccen lamba idan siyan kuɗi ne da Banki, kuma lambar mara kyau idan siyarwa ce.

Lambar code ga furodusa yayi kama da haka:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Bayan haka, ta amfani da hanyar aikawa, muna aika sako zuwa uwar garken, zuwa batun da muke bukata, a cikin tsarin JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Lokacin gudanar da rubutun, muna karɓar saƙonni masu zuwa a cikin tashar:

Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Wannan yana nufin cewa komai yana aiki kamar yadda muke so - furodusa yana haifar da aika saƙonni zuwa batun da muke buƙata.
Mataki na gaba shine shigar da Spark da sarrafa wannan rafi na saƙo.

Sanya Apache Spark

Apache Spark dandamali ne na duniya kuma yana da babban aiki.

Spark yana aiki mafi kyau fiye da sanannen aiwatarwa na ƙirar MapReduce yayin da yake tallafawa nau'ikan ƙididdiga daban-daban, gami da tambayoyin mu'amala da sarrafa rafi. Gudun yana taka muhimmiyar rawa wajen sarrafa bayanai masu yawa, tun da yake gudun ne ke ba ku damar yin aiki tare ba tare da kashe mintuna ko sa'o'i ba. Ɗaya daga cikin manyan ƙarfin Spark da ke sa shi sauri shine ikon yin lissafin ƙwaƙwalwar ajiya.

An rubuta wannan tsarin a cikin Scala, don haka kuna buƙatar shigar da shi da farko:

sudo apt-get install scala

Zazzage rarrabawar Spark daga gidan yanar gizon hukuma:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Cire kayan tarihin:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Ƙara hanyar zuwa Spark zuwa fayil ɗin bash:

vim ~/.bashrc

Ƙara layin masu zuwa ta wurin editan:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Gudun umarnin da ke ƙasa bayan yin canje-canje zuwa bashrc:

source ~/.bashrc

Ana tura AWS PostgreSQL

Abin da ya rage shi ne shigar da bayanan da za mu loda bayanan da aka sarrafa daga rafi. Don wannan za mu yi amfani da sabis na AWS RDS.

Je zuwa na'ura wasan bidiyo na AWS -> AWS RDS -> Databases -> Ƙirƙiri bayanai:
Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Zaɓi PostgreSQL kuma danna Next:
Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Domin Wannan misalin don dalilai na ilimi ne kawai; za mu yi amfani da uwar garken kyauta "aƙalla" (Tier Kyauta):
Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Bayan haka, mun sanya kaska a cikin Free Tier block, kuma bayan haka za a ba mu misali ta atomatik ajin t2.micro - ko da yake yana da rauni, yana da kyauta kuma ya dace da aikinmu:
Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Na gaba abubuwa masu mahimmanci su zo: sunan misali misali, sunan babban mai amfani da kalmar sirri. Bari mu ambaci misalin: myHabrTest, babban mai amfani: habr, kalmar sirri: shafi 12345 kuma danna maɓallin Next:
Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

A shafi na gaba akwai sigogi da ke da alhakin isar da sabar bayanan mu daga waje (Samun damar Jama'a) da wadatar tashar jiragen ruwa:

Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Bari mu ƙirƙiri sabon saiti don ƙungiyar tsaro ta VPC, wanda zai ba da damar shiga waje zuwa uwar garken bayanan mu ta tashar jiragen ruwa 5432 (PostgreSQL).
Bari mu je zuwa na'urar bidiyo ta AWS a cikin wani taga mai bincike daban zuwa Dashboard VPC -> Ƙungiyoyin Tsaro -> Ƙirƙiri sashin rukunin tsaro:
Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Mun saita sunan don ƙungiyar Tsaro - PostgreSQL, bayanin, nuna wanne VPC wannan rukunin yakamata a haɗa dashi kuma danna maɓallin Ƙirƙiri:
Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Cika dokokin Inbound don tashar jiragen ruwa 5432 don sabuwar ƙungiyar da aka ƙirƙira, kamar yadda aka nuna a hoton da ke ƙasa. Ba za ku iya ƙididdige tashar jiragen ruwa da hannu ba, amma zaɓi PostgreSQL daga Nau'in zaɓuka.

A taƙaice magana, ƙimar ::/0 tana nufin samuwar zirga-zirgar ababen hawa zuwa sabar daga ko'ina cikin duniya, wanda ba gaskiya bane gaba ɗaya, amma don nazarin misalin, bari mu ƙyale kanmu muyi amfani da wannan hanyar:
Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Mun koma shafin mai bincike, inda muke da “Sanya saitunan ci gaba” buɗe kuma zaɓi a cikin sashin ƙungiyoyin tsaro na VPC -> Zaɓi ƙungiyoyin tsaro na VPC na yanzu -> PostgreSQL:
Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Na gaba, a cikin Zaɓuɓɓukan Database -> Sunan Database -> saita sunan - habrDB.

Za mu iya barin sauran sigogi, ban da musaki madadin (lokacin riƙewa - 0 days), saka idanu da Halayen Ayyuka, ta tsohuwa. Danna maɓallin Databaseirƙiri bayanai:
Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Mai sarrafa zaren

Mataki na ƙarshe shine haɓaka aikin Spark, wanda zai aiwatar da sabbin bayanan da ke fitowa daga Kafka kowane daƙiƙa biyu kuma shigar da sakamakon a cikin bayanan.

Kamar yadda aka ambata a sama, wuraren bincike sune ainihin tsarin a cikin SparkStreaming wanda dole ne a daidaita shi don tabbatar da haƙurin kuskure. Za mu yi amfani da wuraren bincike kuma, idan tsarin ya gaza, tsarin Spark Streaming zai buƙaci komawa wurin bincike na ƙarshe kuma ya ci gaba da ƙididdigewa daga gare ta don dawo da bayanan da suka ɓace.

Ana iya kunna wuraren bincike ta hanyar saita kundin adireshi akan tsarin fayil mai jurewa, abin dogaro (kamar HDFS, S3, da sauransu) inda za'a adana bayanan wurin bincike. Ana yin wannan ta amfani da, misali:

streamingContext.checkpoint(checkpointDirectory)

A cikin misalinmu, za mu yi amfani da wannan hanya mai zuwa, wato, idan checkpointDirectory ya wanzu, to za a sake ƙirƙirar mahallin daga bayanan binciken. Idan shugabanci bai wanzu (watau an kashe shi a karon farko), to ana kiran aikinToCreateContext don ƙirƙirar sabon mahallin kuma saita DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Mun ƙirƙiri wani abu na DirectStream don haɗawa zuwa batun "ma'amala" ta amfani da hanyar ƙirƙirarDirectStream na ɗakin karatu na KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Fassarar bayanai masu shigowa cikin tsarin JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Yin amfani da Spark SQL, muna yin rukuni mai sauƙi kuma muna nuna sakamakon a cikin na'ura wasan bidiyo:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Samun rubutun tambaya da gudanar da shi ta Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Sannan muna adana bayanan da aka tattara sakamakonsu cikin tebur a cikin AWS RDS. Don ajiye sakamakon tarawa zuwa tebur na bayanai, za mu yi amfani da hanyar rubuta abun DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Kalmomi kaɗan game da saita haɗi zuwa AWS RDS. Mun ƙirƙiri mai amfani da kalmar sirri don shi a matakin “Tsarin AWS PostgreSQL”. Ya kamata ku yi amfani da Ƙarshen Ƙarshen azaman url uwar garken bayanai, wanda aka nuna a sashin Haɗin kai & tsaro:

Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Don haɗa Spark da Kafka daidai, yakamata ku gudanar da aikin ta hanyar smark-submit ta amfani da kayan tarihi. walƙiya-streaming-kafka-0-8_2.11. Bugu da ƙari, za mu kuma yi amfani da kayan tarihi don yin hulɗa tare da bayanan PostgreSQL; za mu canza su ta hanyar - fakiti.

Don sassauƙar rubutun, za mu kuma haɗa a matsayin sigogin shigarwa sunan uwar garken saƙon da batun da muke son karɓar bayanai daga gare ta.

Don haka, lokaci ya yi da za a ƙaddamar da duba ayyukan tsarin:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Komai yayi aiki! Kamar yadda kuke gani a hoton da ke ƙasa, yayin da aikace-aikacen ke gudana, ana fitar da sabbin sakamakon tarawa kowane sakan 2, saboda mun saita tazarar batching zuwa daƙiƙa 2 lokacin da muka ƙirƙiri abin StreamingContext:

Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

Na gaba, muna yin tambaya mai sauƙi ga ma'ajin bayanai don bincika kasancewar bayanan a cikin tebur ma'amala_gudanarwa:

Apache Kafka da Gudanar da Bayanan Yawo tare da Spark Streaming

ƙarshe

Wannan labarin ya kalli misali na sarrafa bayanan rafi ta amfani da Spark Streaming a haɗe tare da Apache Kafka da PostgreSQL. Tare da haɓakar bayanai daga maɓuɓɓuka daban-daban, yana da wahala a ƙididdige ƙimar aiki na Spark Streaming don ƙirƙirar yawo da aikace-aikacen ainihin lokaci.

Kuna iya samun cikakken lambar tushe a cikin ma'ajina a GitHub.

Na yi farin cikin tattauna wannan labarin, ina sa ran yin tsokaci, kuma ina fatan za a yi mana kyakkyawar suka daga dukkan masu karatu masu kulawa.

Ina yi muku fatan nasara!

Zab. Da farko an shirya yin amfani da bayanan PostgreSQL na gida, amma saboda ƙaunata ga AWS, na yanke shawarar matsar da bayanan zuwa gajimare. A cikin labarin na gaba game da wannan batu, zan nuna yadda ake aiwatar da duk tsarin da aka bayyana a sama a cikin AWS ta amfani da AWS Kinesis da AWS EMR. Bi labarai!

source: www.habr.com

Add a comment