Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Silav Habr! Îro em ê pergalek ava bikin ku dê pêlên peyamên Apache Kafka bi karanîna Spark Streaming pêvajoyê bike û encamên pêvajoyê li databasa ewr a AWS RDS binivîsîne.

Werin em bifikirin ku saziyek kredî ya diyarkirî ji me re peywira hilanîna danûstendinên hatina "li ser firînê" li seranserê hemî şaxên wê destnîşan dike. Ev dikare ji bo mebesta hesabkirina tavilê pozîsyonek diravê vekirî ji bo xezîneyê, sînor an encamên darayî yên ji bo danûstendinan, hwd.

Meriv çawa vê dozê bêyî karanîna sêrbaz û sêrbazan bicîh tîne - di bin qutbûnê de bixwînin! Ajotin!

Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming
(Çavkaniya wêneyê)

Pîrozbahiyê

Bê guman, pêvajoyek mezin a daneyan di wextê rast de ji bo karanîna di pergalên nûjen de fersend peyda dike. Yek ji kombînasyona herî populer a ji bo vê yekê tandema Apache Kafka û Spark Streaming e, ku Kafka çemek pakêtên peyamên gihîştî diafirîne, û Spark Streaming van pakêtan di navberek demkî ya diyarkirî de pêvajoyê dike.

Ji bo zêdekirina tolerasyona xeletiya serîlêdanê, em ê nuqteyên kontrolê bikar bînin. Bi vê mekanîzmayê re, dema ku motora Spark Streaming pêdivî ye ku daneyên winda vegerîne, ew tenê hewce dike ku vegere nuqteya kontrolê ya paşîn û hesabên ji wir ji nû ve bide destpêkirin.

Mîmariya pergala pêşkeftî

Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Pêkhateyên bikaranîn:

  • Apache Kafka pergalek peyama weşandin-aboneyê ya belavkirî ye. Hem ji bo xerckirina peyamên negirêdayî hem jî ji bo serhêl maqûl e. Ji bo pêşîgirtina windabûna daneyê, peyamên Kafka li ser dîskê têne hilanîn û di nav komê de têne dubare kirin. Pergala Kafka li ser karûbarê hevdemkirinê ya ZooKeeper hatî çêkirin;
  • Apache Spark Streaming - Parçeyek Spark ji bo hilberandina daneya weşanê. Modula Spark Streaming bi karanîna mîmariya mîkro-hevalê ve hatî çêkirin, ku tê de herika daneyê wekî rêzek domdar a pakêtên daneya piçûk tê şîrove kirin. Spark Streaming daneyan ji çavkaniyên cihêreng digire û di pakêtên piçûk de berhev dike. Di navberên rêkûpêk de pakêtên nû têne afirandin. Di destpêka her navberê de, pakêtek nû tê afirandin, û her daneya ku di wê navberê de hatî wergirtin di pakêtê de tête navandin. Di dawiya navberê de, mezinbûna pakêtê disekine. Mezinahiya navberê ji hêla pîvanek ku jê re navberê batch tê gotin;
  • Apache Spark SQL - Pêvajoya pêwendiyê bi bernameya fonksiyonel a Spark re yek dike. Daneyên birêkûpêk tê wateya daneya ku xwedan şema ye, ango komek zeviyên yekane ji bo hemî tomaran. Spark SQL têketina ji cûrbecûr çavkaniyên daneya birêkûpêk piştgirî dike û, bi saya hebûna agahdariya schema, ew dikare bi bandorkerî tenê qadên tomarên pêwîst bistîne, û API-yên DataFrame jî peyda dike;
  • AWS RDS databasek pêwendiya ewr-based-ê bi erzan e, karûbarê malperê ye ku sazkirin, xebitandin û pîvandinê hêsan dike, û rasterast ji hêla Amazon ve tê rêvebirin.

Sazkirin û xebitandina servera Kafka

Berî ku rasterast Kafka bikar bînin, hûn hewce ne ku pê ewle bin ku Java we heye, ji ber ... JVM ji bo xebatê tê bikar anîn:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Werin em bikarhênerek nû biafirînin ku bi Kafka re bixebite:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Piştre, belavkirinê ji malpera fermî ya Apache Kafka dakêşin:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Arşîva dakêşandî vekin:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Pêngava paşîn vebijarkî ye. Rastî ev e ku mîhengên xwerû rê nadin ku hûn hemî taybetmendiyên Apache Kafka bi tevahî bikar bînin. Mînakî, mijarek, kategorî, grûpek ku peyam dikarin werin weşandin jêbirin. Ji bo guherandina vê, em pelê veavakirinê biguherînin:

vim ~/kafka/config/server.properties

Ya jêrîn li dawiya pelê zêde bikin:

delete.topic.enable = true

Berî destpêkirina servera Kafka, hûn hewce ne ku servera ZooKeeper dest pê bikin; em ê skrîpta alîkar a ku bi belavkirina Kafka re tê bikar bînin:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Piştî ku ZooKeeper bi serfirazî dest pê kir, servera Kafka di termînalek cûda de dest pê bike:

bin/kafka-server-start.sh config/server.properties

Ka em mijarek nû bi navê Danûstandin biafirînin:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Werin em pê ewle bin ku mijarek bi hejmara pêdivî ya dabeşan û dubarekirinê hatîye afirandin:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Werin em demên ceribandina hilberîner û xerîdar ji bo mijara nû hatî afirandin winda bikin. Zêdetir hûrgulî di derbarê ka hûn çawa dikarin şandin û wergirtina peyaman biceribînin di belgeya fermî de têne nivîsandin - Hin peyaman bişînin. Welê, em diçin nivîsandina hilberînerek li Python bi karanîna API-ya KafkaProducer.

Nivîsandina hilberîner

Hilberîner dê daneyên rasthatî çêbike - her saniye 100 peyam. Mebesta me bi daneyên rasthatî ferhengek ji sê qadan pêk tê:

  • Liq - navê cîhê firotanê ya saziya krediyê;
  • Diravcins - pereyê danûstandinê;
  • Biha - mîqdara danûstendinê. Heger ew ji hêla Bankê ve ji hêla Bankê ve bikirin dê hejmarek erênî be, û heke firotek be dê hejmarek neyînî be.

Koda ji bo hilberîner wiha xuya dike:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Dûv re, bi karanîna rêbaza şandinê, em peyamek ji serverê re, ji mijara ku em hewce ne, di formata JSON re dişînin:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Dema ku skrîptê dimeşîne, em di termînalê de peyamên jêrîn digirin:

Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Ev tê vê wateyê ku her tişt wekî ku me dixwest dixebite - hilberîner ji mijara ku em hewce ne re peyaman diafirîne û dişîne.
Pêngava paşîn ev e ku hûn Spark saz bikin û vê herikîna peyamê bişopînin.

Sazkirina Apache Spark

Apache Spark platformek berhevkirina komê ya gerdûnî û performansa bilind e.

Spark ji pêkanînên populer ên modela MapReduce çêtir performans dike dema ku cûrbecûr cûrbecûr hesabkirinê piştgirî dike, di nav de pirsên înteraktîf û pêvajoya tîrêjê. Lezbûn di hilberandina mîqdarên mezin ên daneyê de rolek girîng dilîze, ji ber ku ew leza ye ku dihêle hûn bi înteraktîf bixebitin bêyî ku deqe an demjimêran li bendê derbas bikin. Yek ji hêza herî mezin a Spark ku wê ew qas bilez dike, şiyana wê ya pêkanîna hesabên bîranînê ye.

Ev çarçove li Scala hatî nivîsandin, ji ber vê yekê hûn hewce ne ku pêşî saz bikin:

sudo apt-get install scala

Dabeşkirina Spark ji malpera fermî dakêşin:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Arşîvê vekin:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Rêya Spark li pelê bash zêde bikin:

vim ~/.bashrc

Bi edîtorê rêzên jêrîn zêde bikin:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Piştî ku guheztinên li bashrc bikin, emrê jêrîn bicîh bikin:

source ~/.bashrc

Bicihkirina AWS PostgreSQL

Tiştê ku dimîne ev e ku em databasa ku em ê agahdariya pêvajoyî ji kaniyan barkirin tê de bicîh bikin. Ji bo vê yekê em ê karûbarê AWS RDS bikar bînin.

Herin konsolê AWS -> AWS RDS -> Danegeh -> Database biafirînin:
Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

PostgreSQL hilbijêrin û Next bikirtînin:
Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Bo Ev mînak tenê ji bo mebestên perwerdehiyê ye; em ê serverek belaş "bi kêmanî" bikar bînin (Tier Belaş):
Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Dûv re, em di bloka Free Tier de tikandinek danîne, û piştî wê ji me re bixweber mînakek çîna t2.micro were pêşkêş kirin - her çend qels be jî, ew ji bo karê me belaş û pir maqûl e:
Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Piştre tiştên pir girîng têne: Navê mînaka databasê, navê bikarhênerê master û şîfreya wî. Ka em navê nimûneyê bikin: myHabrTest, bikarhênerê sereke: habr, şîfre: habr12345 û li ser bişkoka Next bikirtînin:
Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Di rûpela paşîn de parametreyên ku ji bo gihîştina servera databasa me ya ji derve (Gihîştina gelemperî) û hebûna portê berpirsiyar in hene:

Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Ka em ji bo koma ewlehiya VPC mîhengek nû biafirînin, ku dê rê bide gihîştina derveyî servera databasa me bi porta 5432 (PostgreSQL).
Ka em di pencereyek gerokek cihêreng de biçin konsoleya AWS-ê berbi VPC Dashboard -> Komên Ewlekariyê -> Beşa Koma Ewlekariyê biafirînin:
Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Me navê koma Ewlekariyê destnîşan kir - PostgreSQL, ravek, destnîşan dike ku divê ev kom bi kîjan VPC-ê re têkildar be û bişkojka Create bikirtînin:
Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Ji bo koma ku nû hatî afirandin qaîdeyên Inbound-ê ji bo port 5432 dagirin, wekî ku di wêneya jêrîn de tê xuyang kirin. Hûn nikarin portê bi destan diyar bikin, lê PostgreSQL ji navnîşa dakêşana Type hilbijêrin.

Bi eşkereyî, nirxa ::/0 tê wateya hebûna seyrûsefera gihîştî ya serverê ji çar aliyên cîhanê, ku bi qanonîkî ne bi tevahî rast e, lê ji bo analîzkirina nimûneyê, em bihêlin ku em vê nêzîkatiyê bikar bînin:
Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Em vedigerin rûpela gerokê, ku me "Mîhengên pêşkeftî mîheng bike" vekiriye û di beşa komên ewlehiyê yên VPC de hilbijêrin -> Komên ewlehiyê yên VPC yên heyî hilbijêrin -> PostgreSQL:
Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Dûv re, di vebijarkên Database -> Navê Database -> Navê bicîh bikin - habrDB.

Em dikarin parametreyên mayî bihêlin, ji bilî neçalakkirina paşvekêşanê (serdema hilgirtina hilanînê - 0 roj), çavdêrîkirin û Nêrîna Performansê, ji hêla xwerû. Bişkojka bitikîne Database çêbikin:
Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Rêvebirê Mijarê

Qonaxa dawî dê pêşxistina karekî Spark be, ku dê her du saniyeyan carekê daneyên nû yên ji Kafka têne hilanîn û encamê têxe databasê.

Wekî ku li jor hate destnîşan kirin, xalên kontrolê di SparkStreaming de mekanîzmayek bingehîn e ku divê were mîheng kirin da ku tolerasyona xeletiyê misoger bike. Em ê nuqteyên kontrolê bikar bînin û, heke prosedur biser nekeve, modula Spark Streaming tenê hewce dike ku vegere nuqteya kontrolê ya paşîn û hesabên jê ji nû ve bidomîne da ku daneyên wenda vegerîne.

Nîqaşa kontrolê dikare bi danîna pelrêçek li ser pergalek pelê ya bi xeletî, pêbawer (wekî HDFS, S3, hwd.) ku tê de agahdariya xala kontrolê were hilanîn, were çalak kirin. Ev bi kar tîne, wek nimûne:

streamingContext.checkpoint(checkpointDirectory)

Di mînaka xwe de, em ê nêzîkatiya jêrîn bikar bînin, ango, heke checkpointDirectory hebe, wê hingê dê çarçove ji daneyên xala kontrolê ji nû ve were afirandin. Ger pelrêça tine be (ango ji bo cara yekem hatî darve kirin), wê hingê fonksiyona ToCreateContext tê gotin ku çarçoveyek nû biafirîne û DStreams mîheng bike:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Em bi rêbaza createDirectStream ya pirtûkxaneya KafkaUtils ve bi mijara "danûstandin" ve girêdidin tiştek DirectStream:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Parvekirina daneyên hatina di formata JSON de:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Bi karanîna Spark SQL, em kombûnek hêsan dikin û encamê di konsolê de nîşan didin:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Nivîsara pirsê bistînin û bi Spark SQL ve bi rê ve bibin:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Dûv re em daneyên berhevkirî yên encam di tabloyek AWS RDS de hilînin. Ji bo tomarkirina encamên berhevkirinê li ser tabloyek databasê, em ê rêbaza nivîsandinê ya objekta DataFrame bikar bînin:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Çend gotin di derbarê sazkirina pêwendiyek bi AWS RDS de. Me bikarhêner û şîfreya wê di pêngava "Bikaranîna AWS PostgreSQL" de çêkir. Pêdivî ye ku hûn Endpoint wekî url-ya servera databasê bikar bînin, ku di beşa Têkilî û Ewlekariyê de tê xuyang kirin:

Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Ji bo ku hûn Spark û Kafka rast bi hev ve girêdin, divê hûn kar bi smark-submit bi karanîna artifactê bimeşînin. spark-streaming-kafka-0-8_2.11. Wekî din, em ê ji bo danûstendina bi databasa PostgreSQL re hunerek jî bikar bînin; em ê wan bi rêya --pakêtan veguhezînin.

Ji bo nermbûna skrîptê, em ê wekî pîvanên têketinê navê servera peyamê û mijara ku em jê dixwazin daneyan bistînin jî têxin nav xwe.

Ji ber vê yekê, ew dem e ku meriv fonksiyona pergalê bide destpêkirin û kontrol bike:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Her tişt pêk hat! Wekî ku hûn di wêneya jêrîn de dibînin, dema ku serîlêdan dimeşîne, her 2 saniyeyan carek encamên berhevkirina nû têne derxistin, ji ber ku me dema ku me tişta StreamingContext çêkir, navbera berhevkirinê li 2 çirkeyan datîne:

Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

Dûv re, em pirsek hêsan ji databasê re dikin da ku hebûna tomarên di tabloyê de kontrol bikin transaction_flow:

Apache Kafka û Pêvajoya Daneyên Streaming bi Spark Streaming

encamê

Vê gotarê li mînakek hilberandina agahdariya bi karanîna Spark Streaming digel Apache Kafka û PostgreSQL nihêrî. Bi mezinbûna daneyan ji çavkaniyên cihêreng, dijwar e ku meriv nirxa pratîkî ya Spark Streaming ji bo afirandina serîlêdanên streaming û-dem-demê zêde bike.

Hûn dikarin koda çavkaniyê ya tevahî di depoya min de bibînin GitHub.

Ez kêfxweş im ku li ser vê gotarê nîqaş bikim, ez li benda şîroveyên we me, û ez jî hêvî dikim ku rexneyên çêker ji hemî xwendevanên dilnerm re.

Ez hêvî dikim serkeftî!

PS. Di destpêkê de hate plan kirin ku databasek PostgreSQL ya herêmî bikar bîne, lê ji ber hezkirina min a ji AWS re, min biryar da ku databasê berbi ewrê vekim. Di gotara din a li ser vê mijarê de, ez ê nîşan bidim ka meriv çawa bi karanîna AWS Kinesis û AWS EMR tevahiya pergala ku li jor li AWS hatî destnîşan kirin bicîh tîne. Nûçeyan bişopînin!

Source: www.habr.com

Add a comment