ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

Aloha, Habr! I kēia lā e kūkulu mākou i kahi ʻōnaehana e hoʻoponopono i nā kahawai memo Apache Kafka me ka hoʻohana ʻana i ka Spark Streaming a kākau i nā hopena hoʻoili i ka waihona kapuaʻi AWS RDS.

E noʻonoʻo kākou ua hoʻonohonoho kekahi ʻoihana hōʻaiʻē iā mākou i ka hana o ka hoʻomaʻamaʻa ʻana i nā kālepa e hiki mai ana "ma ka lele" ma kāna mau lālā āpau. Hiki ke hana i kēia no ke kumu o ka helu koke ʻana i kahi kūlana kālā hāmama no ka waihona kālā, nā palena a i ʻole nā ​​hopena kālā no nā kālepa, etc.

Pehea e hoʻokō ai i kēia hihia me ka ʻole o ka hoʻohana ʻana i nā kilokilo a me nā kilokilo - heluhelu ma lalo o ka ʻoki! Hele!

ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming
(Kumu kiʻi)

Hōʻike

ʻOiaʻiʻo, ʻo ka hana ʻana i ka nui o ka ʻikepili i ka manawa maoli e hāʻawi i nā manawa kūpono no ka hoʻohana ʻana i nā ʻōnaehana hou. ʻO kekahi o nā hui kaulana loa no kēia, ʻo ia ka tandem o Apache Kafka a me Spark Streaming, kahi e hana ai ʻo Kafka i kahi kahawai o nā ʻeke memo e hiki mai ana, a ʻo Spark Streaming e hana i kēia mau ʻeke i ka manawa i hāʻawi ʻia.

No ka hoʻonui ʻana i ka hewa o ka noi, e hoʻohana mākou i nā māka. Me kēia hana, i ka wā e pono ai ka ʻenekini Spark Streaming e hoʻihoʻi i ka ʻikepili nalowale, pono ia e hoʻi i ka helu hope loa a hoʻomau i nā helu mai laila.

Hoʻolālā o ka ʻōnaehana i kūkulu ʻia

ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

Nā ʻāpana i hoʻohana ʻia:

  • Apache Kafka he ʻōnaehana hoʻolaha hoʻolaha hoʻolaha. He kūpono no ka hoʻohana ʻana i nā memo ma waho a me ka pūnaewele. No ka pale ʻana i ka nalowale o ka ʻikepili, mālama ʻia nā memo Kafka ma ka disk a hana hou ʻia i loko o ka pūʻulu. Kūkulu ʻia ka ʻōnaehana Kafka ma luna o ka lawelawe synchronization ZooKeeper;
  • ʻO Apache Spark Streaming - Māhele Spark no ka hoʻoili ʻana i ka ʻikepili kahe. Hoʻokumu ʻia ka module Spark Streaming me ka hoʻohana ʻana i kahi hoʻolālā micro-batch, kahi i unuhi ʻia ai ke kahawai ʻikepili ma ke ʻano he kaʻina mau o nā ʻeke ʻikepili liʻiliʻi. Lawe ʻo Spark Streaming i ka ʻikepili mai nā kumu like ʻole a hoʻohui ʻia i loko o nā pūʻolo liʻiliʻi. Hana ʻia nā pūʻolo hou i nā manawa maʻamau. I ka hoʻomaka ʻana o kēlā me kēia manawa, hana ʻia kahi ʻeke hou, a ua hoʻokomo ʻia nā ʻikepili i loaʻa i ia wā i loko o ka ʻeke. I ka pau ʻana o ka wā, pau ka ulu ʻana o ka ʻeke. Hoʻoholo ʻia ka nui o ka wā e kahi ʻāpana i kapa ʻia ʻo ka wā hui;
  • ʻO Apache Spark SQL - hoʻohui i ka hoʻoponopono pili me ka hoʻolālā hana Spark. ʻO ka ʻikepili i kūkulu ʻia, ʻo ia hoʻi ka ʻikepili i loaʻa kahi schema, ʻo ia hoʻi, hoʻokahi pūʻulu kahua no nā moʻolelo āpau. Kākoʻo ʻo Spark SQL i ka hoʻokomo ʻana mai nā kumu ʻikepili i kūkulu ʻia a, mahalo i ka loaʻa ʻana o ka ʻike schema, hiki iā ia ke kiʻi maikaʻi wale i nā kahua i koi ʻia o nā moʻolelo, a hāʻawi pū i nā API DataFrame;
  • AWS RDS ʻO ia kahi ʻikepili pili i ke ao, lawelawe pūnaewele e hoʻomaʻamaʻa i ka hoʻonohonoho, ka hana a me ka scaling, a lawelawe pololei ʻia e Amazon.

Ke hoʻouka a me ka holo ʻana i ke kikowaena Kafka

Ma mua o ka hoʻohana pono ʻana iā Kafka, pono ʻoe e hōʻoia iā ʻoe iā Java, no ka mea... Hoʻohana ʻia ʻo JVM no ka hana:

sudo apt-get update 
sudo apt-get install default-jre
java -version

E hana kākou i mea hoʻohana hou e hana pū me Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

A laila, hoʻoiho i ka hāʻawi ʻana mai ka pūnaewele Apache Kafka mana:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Wehe i ka waihona i hoʻoiho ʻia:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

ʻO ka hana aʻe he koho. ʻO ka ʻoiaʻiʻo ʻaʻole ʻae nā hoʻonohonoho paʻamau iā ʻoe e hoʻohana piha i nā mana āpau o Apache Kafka. No ka laʻana, holoi i kahi kumuhana, ʻano, hui i hiki ke paʻi ʻia nā memo. No ka hoʻololi i kēia, e hoʻoponopono i ka faila hoʻonohonoho:

vim ~/kafka/config/server.properties

E hoʻohui i kēia i ka hope o ka faila:

delete.topic.enable = true

Ma mua o ka hoʻomaka ʻana i ka server Kafka, pono ʻoe e hoʻomaka i ka server ZooKeeper; e hoʻohana mākou i ka palapala kōkua e hele mai me ka hāʻawi Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Ma hope o ka hoʻomaka maikaʻi ʻana o ZooKeeper, e hoʻomaka i ka server Kafka ma kahi kikowaena ʻokoʻa:

bin/kafka-server-start.sh config/server.properties

E hana kākou i kumuhana hou i kapa ʻia ʻo Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

E hōʻoia mākou ua hana ʻia kahi kumuhana me ka nui o nā ʻāpana a me ka hana hou ʻana:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

E nalo kākou i nā manawa o ka hoʻāʻo ʻana i ka mea hana a me ka mea kūʻai aku no ke kumuhana hou. ʻO nā kikoʻī hou aʻe e pili ana i ke ʻano e hiki ai iā ʻoe ke hoʻāʻo i ka hoʻouna ʻana a me ka loaʻa ʻana o nā leka i kākau ʻia ma ka palapala kūhelu - E hoʻouna i kekahi mau leka. ʻAe, neʻe mākou e kākau i kahi mea hana ma Python e hoʻohana ana i ka KafkaProducer API.

Mea kākau kākau

E hoʻopuka ka mea hana i ka ʻikepili maʻamau - 100 mau memo i kēlā me kēia kekona. ʻO ka ʻikepili maʻamau ke ʻōlelo nei mākou he puke wehewehe ʻōlelo ʻekolu mau kahua:

  • Pālā - ka inoa o ke kahua kūʻai o ka hale kūʻai;
  • kālā - kālā kālepa;
  • huina — ka nui o ka hana. He helu maikaʻi ka huina inā he kūʻai kālā ia e ka Bank, a he helu maikaʻi ʻole inā he kūʻai.

Penei ke code no ka mea hana:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

A laila, me ka hoʻohana ʻana i ke ʻano hoʻouna, hoʻouna mākou i kahi leka i ke kikowaena, i ke kumuhana a mākou e pono ai, ma JSON format:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

I ka holo ʻana i ka palapala, loaʻa iā mākou nā memo ma ka pahu:

ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

ʻO ia ke ʻano o nā mea a pau e like me kā mākou makemake - hana ka mea hana a hoʻouna i nā leka i ke kumuhana a mākou e pono ai.
ʻO ka hana aʻe e hoʻokomo iā Spark a hana i kēia kahawai memo.

Ke hoʻouka nei iā Apache Spark

Apache Spark he kahua hoʻopili pūʻulu hui honua a me ka hana kiʻekiʻe.

ʻOi aku ka maikaʻi o ka Spark ma mua o nā hoʻokō kaulana o ke kumu hoʻohālike ʻo MapReduce ʻoiai e kākoʻo ana i kahi ākea o nā ʻano helu helu, me nā nīnau pili a me ka hoʻoili kahawai. He kuleana koʻikoʻi ka wikiwiki i ka wā e hoʻoili ai i ka nui o ka ʻikepili, ʻoiai ʻo ia ka wikiwiki e hiki ai iā ʻoe ke hana me ka ʻole o ka hoʻolilo ʻana i mau minuke a i ʻole mau hola e kali ana. ʻO kekahi o ka ikaika nui o Spark e wikiwiki loa ai, ʻo ia ka hiki ke hana i nā helu hoʻomanaʻo.

Ua kākau ʻia kēia ʻano hana ma Scala, no laila pono ʻoe e hoʻokomo iā ia ma mua:

sudo apt-get install scala

Hoʻoiho i ka hāʻawi Spark mai ka pūnaewele official:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Wehe i ka waihona:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Hoʻohui i ke ala i Spark i ka faila bash:

vim ~/.bashrc

Hoʻohui i kēia mau laina ma o ka mea hoʻoponopono:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

E holo i ke kauoha ma lalo nei ma hope o ka hoʻololi ʻana i ka bashrc:

source ~/.bashrc

Ke hoʻohana nei i ka AWS PostgreSQL

ʻO nā mea a pau i koe, ʻo ia ke kau ʻana i ka waihona ʻikepili kahi e hoʻouka ai mākou i ka ʻike i hoʻoponopono ʻia mai nā kahawai. No kēia e hoʻohana mākou i ka lawelawe AWS RDS.

E hele i ka console AWS -> AWS RDS -> Nā ʻikepili -> Hana i ka waihona:
ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

E koho i ka PostgreSQL a kaomi Next:
ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

No ka mea No nā kumu hoʻonaʻauao wale nō kēia laʻana; e hoʻohana mākou i kahi kikowaena manuahi "ma ka liʻiliʻi loa" (Free Tier):
ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

Ma hope aʻe, kau mākou i kahi kiko i ka poloka Free Tier, a ma hope o ia e hāʻawi ʻia mākou i kahi laʻana o ka papa t2.micro - ʻoiai nāwaliwali, manuahi a kūpono loa no kā mākou hana:
ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

E hele mai ana nā mea nui loa: ka inoa o ka waihona waihona, ka inoa o ka haku mea hoʻohana a me kāna ʻōlelo huna. E inoa i ka laʻana: myHabrTest, haku mea hoʻohana: habr, hua huna: habr12345 a kaomi ma ka pihi Next:
ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

Ma ka ʻaoʻao aʻe, aia nā ʻāpana e pili ana i ka hiki ʻana o kā mākou kikowaena waihona mai waho (Ka hiki i ka lehulehu) a me ka loaʻa ʻana o ke awa:

ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

E hana mākou i kahi hoʻonohonoho hou no ka hui palekana VPC, kahi e hiki ai ke komo i waho i kā mākou kikowaena waihona ma o ke awa 5432 (PostgreSQL).
E hele kāua i ka console AWS ma kahi puka aniani ʻokoʻa i ka VPC Dashboard -> Pūʻulu Palekana -> Hana i ka ʻāpana pūʻulu palekana:
ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

Hoʻonoho mākou i ka inoa no ka pūʻulu Security - PostgreSQL, kahi wehewehe, e hōʻike i ka VPC e pili ai kēia pūʻulu a kaomi i ka pihi Create:
ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

E hoʻopiha i nā lula Inbound no ka awa 5432 no ka hui hou i hana ʻia, e like me ka mea i hōʻike ʻia ma ke kiʻi ma lalo nei. ʻAʻole hiki iā ʻoe ke kuhikuhi i ke awa me ka lima, akā koho i ka PostgreSQL mai ka Type drop-down list.

ʻO ka ʻōlelo koʻikoʻi, ʻo ka waiwai ::/0 ʻo ia ka loaʻa ʻana o nā kaʻa e hiki mai ana i ka server mai nā wahi a pau o ka honua, ʻaʻole maoli maoli ka canonically, akā no ka nānā ʻana i ka laʻana, e ʻae mākou iā mākou iho e hoʻohana i kēia ala:
ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

Hoʻi mākou i ka ʻaoʻao polokalamu kele pūnaewele, kahi i wehe ʻia ai "Configure advanced settings" a koho i ka ʻāpana o nā hui palekana VPC -> E koho i nā pūʻulu palekana VPC i loaʻa -> PostgreSQL:
ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

A laila, i nā koho Database -> inoa waihona -> hoʻonoho i ka inoa - habrDB.

Hiki iā mākou ke waiho i nā ʻāpana i koe, koe wale nō ka hoʻopau ʻana i ka hoʻihoʻi ʻana (manawa mālama paʻa - 0 mau lā), ka nānā ʻana a me ka Performance Insights, ma ka paʻamau. Kaomi ma ke pihi Hana i ka hōkeo ʻikepili:
ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

Mea hoʻopaʻa kaula

ʻO ka pae hope loa, ʻo ia ka hoʻomohala ʻana i kahi hana Spark, kahi e hoʻoponopono ai i nā ʻikepili hou mai Kafka i kēlā me kēia ʻelua kekona a hoʻokomo i ka hopena i loko o ka waihona.

E like me ka mea i hōʻike ʻia ma luna nei, ʻo nā mākaʻi he ʻano kumu nui i SparkStreaming e pono e hoʻonohonoho ʻia e hōʻoia i ka hoʻomanawanui hewa. E hoʻohana mākou i nā wahi hōʻoia a, inā hāʻule ke kaʻina hana, pono wale ka Spark Streaming module e hoʻi i ka helu hope loa a hoʻomau i ka helu ʻana mai ia mea e hoʻihoʻi i ka ʻikepili nalowale.

Hiki ke hoʻohana ʻia ka mākaʻikaʻi ma ka hoʻonohonoho ʻana i kahi papa kuhikuhi ma kahi ʻōnaehana file hiki ke ʻae i ka hewa (e like me HDFS, S3, etc.) kahi e mālama ʻia ai ka ʻike kikoʻī. Hana ʻia kēia me ka hoʻohana ʻana, no ka laʻana:

streamingContext.checkpoint(checkpointDirectory)

Ma kā mākou laʻana, e hoʻohana mākou i kēia ala, ʻo ia hoʻi, inā loaʻa ka checkpointDirectory, a laila e hana hou ʻia ka pōʻaiapili mai ka ʻike kikoʻī. Inā ʻaʻole i loaʻa ka papa kuhikuhi (ʻo ia hoʻi ka hoʻokō ʻia no ka manawa mua), a laila kāhea ʻia ka functionToCreateContext e hana i kahi pōʻaiapili hou a hoʻonohonoho i nā DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Hana mākou i kahi mea DirectStream e hoʻopili ai i ke kumuhana "transaction" me ka hoʻohana ʻana i ke ʻano createDirectStream o ka waihona KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Ke hoʻokaʻawale nei i ka ʻikepili e komo mai ana ma ke ʻano JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Ke hoʻohana nei iā Spark SQL, hana mākou i kahi hui maʻalahi a hōʻike i ka hopena i ka console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Ke kiʻi ʻana i ka kikokikona nīnau a me ka holo ʻana ma o Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

A laila mālama mākou i ka ʻikepili i hōʻuluʻulu ʻia i loko o kahi papa ma AWS RDS. No ka mālama ʻana i nā hopena hōʻuluʻulu i kahi papa ʻikepili, e hoʻohana mākou i ke ʻano kākau o ka mea DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

He mau huaʻōlelo e pili ana i ka hoʻonohonoho ʻana i kahi pilina i AWS RDS. Ua hana mākou i ka mea hoʻohana a me ka ʻōlelo huna no ia ma ke kaʻina "Deploying AWS PostgreSQL". Pono e hoʻohana ʻia ka Endpoint ma ke ʻano he url kikowaena waihona, i hōʻike ʻia ma ka ʻāpana Connectivity & Security:

ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

I mea e hoʻopili pono ai iā Spark a me Kafka, pono ʻoe e holo i ka hana ma o smark-submit me ka hoʻohana ʻana i ka artifact. spark-streaming-kafka-0-8_2.11. Eia hou, e hoʻohana pū mākou i kahi artifact no ka launa pū ʻana me ka waihona PostgreSQL; e hoʻoili mākou iā lākou ma o --packages.

No ka maʻalahi o ka palapala, e hoʻokomo pū mākou i ka inoa o ke kikowaena memo a me ke kumuhana a mākou e makemake ai e loaʻa ka ʻikepili.

No laila, ʻo ka manawa kēia e hoʻomaka a nānā i ka hana o ka ʻōnaehana:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Ua holo pono nā mea a pau! E like me kāu e ʻike ai ma ke kiʻi ma lalo nei, ʻoiai e holo ana ka noi, hoʻopuka ʻia nā hopena hōʻuluʻulu hou i kēlā me kēia 2 kekona, no ka mea, ua hoʻonohonoho mākou i ka wā hoʻopaʻa i 2 kekona i ka wā i hana ai mākou i ka mea StreamingContext:

ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

A laila, hana mākou i kahi nīnau maʻalahi i ka waihona e nānā i ka noho ʻana o nā moʻolelo ma ka papa transaction_flow:

ʻO Apache Kafka a me Streaming Data Processing me Spark Streaming

hopena

Ua nānā kēia ʻatikala i kahi hiʻohiʻona o ke kahe ʻana o ka ʻike e hoʻohana ana i ka Spark Streaming i hui pū me Apache Kafka a me PostgreSQL. Me ka ulu ʻana o ka ʻikepili mai nā kumu like ʻole, paʻakikī ke hoʻonui i ka waiwai kūpono o Spark Streaming no ka hoʻokumu ʻana i nā noi streaming a me ka manawa maoli.

Hiki iā ʻoe ke loaʻa ka code kumu piha i kaʻu waihona ma GitHub.

Hauʻoli wau i ke kūkākūkā ʻana i kēia ʻatikala, ke kakali nei au i kāu mau manaʻo, a ke manaʻolana nei hoʻi au i nā ʻōlelo hoʻohewa maikaʻi mai nā mea heluhelu aloha a pau.

Makemake wau e holomua ʻoe!

PS. I ka hoʻomaka ʻana ua hoʻolālā ʻia e hoʻohana i kahi waihona PostgreSQL kūloko, akā hāʻawi ʻia i koʻu aloha no AWS, ua hoʻoholo wau e hoʻoneʻe i ka waihona i ke ao. Ma ka ʻatikala e pili ana i kēia kumuhana, e hōʻike wau pehea e hoʻokō ai i ka ʻōnaehana holoʻokoʻa i hōʻike ʻia ma luna nei ma AWS me ka hoʻohana ʻana iā AWS Kinesis a me AWS EMR. E hahai i ka nūhou!

Source: www.habr.com

Pākuʻi i ka manaʻo hoʻopuka