Apache Kafka kaj Data Streaming kun Spark Streaming

Saluton, Habr! Hodiaŭ ni konstruos sistemon, kiu prilaboros Apache Kafka-mesaĝon per Spark Streaming kaj skribos la prilaborajn rezultojn al la nuba datumbazo de AWS RDS.

Ni imagu, ke certa kredita institucio metas antaŭ ni la taskon prilabori envenantajn transakciojn "sur la flugo" por ĉiuj siaj branĉoj. Ĉi tio povas esti farita por rapide kalkuli la malferman valutan pozicion por la trezorejo, limoj aŭ financa rezulto pri transakcioj ktp.

Kiel efektivigi ĉi tiun kazon sen la uzo de magiaj kaj magiaj sorĉoj - legu sub la tranĉo! Iru!

Apache Kafka kaj Data Streaming kun Spark Streaming
(Bildofonto)

Enkonduko

Kompreneble, realtempa prilaborado de granda kvanto da datumoj disponigas ampleksajn ŝancojn por uzo en modernaj sistemoj. Unu el la plej popularaj kombinaĵoj por tio estas la tandemo de Apache Kafka kaj Spark Streaming, kie Kafka kreas fluon de envenantaj mesaĝpakaĵoj, kaj Spark Streaming prilaboras ĉi tiujn pakaĵetojn je difinita tempointervalo.

Por plibonigi la misfunkciadon de la aplikaĵo, ni uzos kontrolpunktojn - kontrolpunktojn. Kun ĉi tiu mekanismo, kiam la modulo Spark Streaming bezonas reakiri perditajn datumojn, ĝi nur bezonas reveni al la lasta kontrolpunkto kaj rekomenci kalkulojn de tie.

Arkitekturo de la evoluinta sistemo

Apache Kafka kaj Data Streaming kun Spark Streaming

Uzitaj komponantoj:

  • Apache Kafka estas distribuita publikig-kaj-aboni mesaĝsistemo. Taŭga por eksterreta kaj interreta mesaĝkonsumado. Por malhelpi datumperdon, Kafka mesaĝoj estas stokitaj sur disko kaj reproduktitaj ene de la areto. La Kafka sistemo estas konstruita aldone al la ZooKeeper-sinkroniga servo;
  • Apache Spark Streaming - Spark-komponento por prilaborado de fluaj datumoj. La Spark Streaming-modulo estas konstruita uzante mikro-aran arkitekturon, kie la datumfluo estas interpretita kiel kontinua sekvenco de malgrandaj datumpakaĵoj. Spark Streaming prenas datumojn de malsamaj fontoj kaj kombinas ĝin en malgrandajn pakaĵojn. Novaj pakaĵoj estas kreitaj je regulaj intervaloj. Komence de ĉiu tempointervalo, nova pakaĵeto estas kreita, kaj ĉiuj datumoj ricevitaj dum tiu intervalo estas inkluzivita en la pakaĵeto. Je la fino de la intervalo, paka kresko ĉesas. La grandeco de la intervalo estas determinita per parametro nomita la aro-intervalo;
  • Apache Spark SQL - Kombinas interrilatan prilaboradon kun Spark-funkcia programado. Strukturitaj datumoj rilatas al datumoj, kiuj havas skemon, tio estas, ununuran aron de kampoj por ĉiuj registroj. Spark SQL subtenas enigon de diversaj strukturitaj datumfontoj kaj, pro la ĉeesto de skemaj informoj, ĝi povas efike reakiri nur la postulatajn kampojn de rekordoj, kaj ankaŭ provizas DataFrame-APIojn;
  • AWS RDS estas relative malmultekosta nub-bazita interrilata datumbazo, retservo kiu simpligas aranĝon, operacion kaj skaladon, administrita rekte fare de Amazon.

Instalante kaj funkciigante la Kafka-servilon

Antaŭ uzi Kafka rekte, vi devas certigi, ke vi havas Java, ĉar JVM estas uzata por laboro:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Ni kreu novan uzanton por labori kun Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Poste, elŝutu la distribuan kompleton de la oficiala retejo de Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Malpaku la elŝutitan arkivon:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

La sekva paŝo estas laŭvola. Fakte, la defaŭltaj agordoj ne permesas vin plene uzi ĉiujn funkciojn de Apache Kafka. Ekzemple, forigu temon, kategorion, grupon, pri kiu mesaĝoj povas esti publikigitaj. Por ŝanĝi ĉi tion, ni redaktu la agordan dosieron:

vim ~/kafka/config/server.properties

Aldonu la jenon al la fino de la dosiero:

delete.topic.enable = true

Antaŭ ol komenci la Kafka-servilon, vi devas komenci la ZooKeeper-servilon, ni uzos la helpan skripton kiu venas kun la Kafka distribuo:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Post kiam ZooKeeper komenciĝis sukcese, lanĉu la Kafka-servilon en aparta terminalo:

bin/kafka-server-start.sh config/server.properties

Ni kreu novan temon nomitan Transakcio:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Ni certigu, ke temo kun la bezonata nombro da sekcioj kaj reproduktado estas kreita:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka kaj Data Streaming kun Spark Streaming

Ni maltrafu la momentojn de testado de la produktanto kaj konsumanto por la nove kreita temo. Pliaj detaloj pri kiel vi povas testi sendi kaj ricevi mesaĝojn estas skribitaj en la oficiala dokumentaro - Sendu kelkajn mesaĝojn. Nu, ni daŭrigas verki produktanton en Python uzante la KafkaProducer API.

Verkado de produktanto

La produktanto generos hazardajn datumojn - 100 mesaĝojn ĉiun sekundon. Per hazardaj datumoj ni signifas vortaron konsistantan el tri kampoj:

  • branĉo — nomo de la vendejo de la kredita institucio;
  • monero — transakcia valuto;
  • Kvanto — transakcia kvanto. La kvanto estos pozitiva nombro se ĝi estas aĉeto de valuto de la Banko, kaj negativa nombro se ĝi estas vendo.

La kodo por la produktanto aspektas jene:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Poste, uzante la sendan metodon, ni sendas mesaĝon al la servilo, al la temo, kiun ni bezonas, en formato JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Dum funkciado de la skripto, ni ricevas la sekvajn mesaĝojn en la terminalo:

Apache Kafka kaj Data Streaming kun Spark Streaming

Ĉi tio signifas, ke ĉio funkcias kiel ni volis - la produktanto generas kaj sendas mesaĝojn al la temo, kiun ni bezonas.
La sekva paŝo estas instali Spark kaj prilabori ĉi tiun mesaĝan fluon.

Instalante Apache Spark

Apache Spark estas multflanka kaj alt-efikeca cluster-komputila platformo.

Spark agas pli bone ol popularaj efektivigoj de la modelo MapReduce dum ĝi subtenas pli larĝan gamon da komputadspecoj, inkluzive de interagaj demandoj kaj fluo-prilaborado. Rapido ludas gravan rolon dum prilaborado de grandaj kvantoj da datumoj, ĉar ĝi estas rapideco, kiu permesas vin labori interage sen pasigi minutojn aŭ horojn atendante. Unu el la plej grandaj fortoj de Spark, kiu igas ĝin tiel rapida, estas ĝia kapablo fari en-memorajn kalkulojn.

Ĉi tiu kadro estas skribita en Scala, do vi devas unue instali ĝin:

sudo apt-get install scala

Elŝutu la Spark-distribuon de la oficiala retejo:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Malpaki la arkivon:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Aldonu la vojon al Spark al la bash-dosiero:

vim ~/.bashrc

Aldonu la sekvajn liniojn per la redaktilo:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Rulu la komandon sube post fari ŝanĝojn al bashrc:

source ~/.bashrc

Deplojante AWS PostgreSQL

Restas pligrandigi la datumbazon, kie ni plenigos la prilaboritajn informojn el la riveretoj. Por fari tion, ni uzos la servon AWS RDS.

Iru al la AWS-konzolo -> AWS RDS -> Datumbazoj -> Krei datumbazon:
Apache Kafka kaj Data Streaming kun Spark Streaming

Elektu PostgreSQL kaj alklaku Sekva:
Apache Kafka kaj Data Streaming kun Spark Streaming

Ĉar Ĉi tiu ekzemplo estas nur por edukaj celoj; ni uzos senpagan servilon "minimume" (Senpaga Nivelo):
Apache Kafka kaj Data Streaming kun Spark Streaming

Poste, ni metas tick en la Free Tier-bloko, kaj post tio ni aŭtomate proponos ekzemplon de la t2.micro-klaso - kvankam malforta, ĝi estas senpaga kaj sufiĉe taŭga por nia tasko:
Apache Kafka kaj Data Streaming kun Spark Streaming

Venas tre gravaj aferoj: la nomo de la datumbaza petskribo, la nomo de la majstra uzanto kaj lia pasvorto. Ni nomu la ekzemplon: myHabrTest, majstra uzanto: habr, Pasvorto: habr12345 kaj alklaku la Sekvan butonon:
Apache Kafka kaj Data Streaming kun Spark Streaming

En la sekva paĝo estas parametroj respondecaj pri la alirebleco de nia datumbaza servilo de ekstere (Publika alirebleco) kaj haveno havebleco:

Apache Kafka kaj Data Streaming kun Spark Streaming

Ni kreu novan agordon por la sekureca grupo VPC, kiu permesos eksteran aliron al nia datumbaza servilo per haveno 5432 (PostgreSQL).
Ni iru en aparta retumila fenestro al la AWS-konzolo en la VPC Panelo -> Sekurecaj Grupoj -> Krei sekurecan grupon sekcion:
Apache Kafka kaj Data Streaming kun Spark Streaming

Ni fiksas la nomon por la Sekureca grupo - PostgreSQL, priskribo, specifu al kiu VPC ĉi tiu grupo estu asociita kaj alklaku la butonon Krei:
Apache Kafka kaj Data Streaming kun Spark Streaming

Ni kompletigas por la lastatempe kreitaj grupaj Envenaj reguloj por haveno 5432, kiel montrite en la suba bildo. Vi ne povas specifi la pordon permane, sed elektu PostgreSQL el la fallisto Tipo.

Strikte parolante, la valoro ::/0 signifas la haveblecon de envenanta trafiko por la servilo el la tuta mondo, kio ne estas tute vera kanone, sed por analizi la ekzemplon, ni uzu ĉi tiun aliron:
Apache Kafka kaj Data Streaming kun Spark Streaming

Ni revenas al la retumila paĝo, kie ni havas "Agordu altnivelajn agordojn" malfermita kaj elektu VPC-sekurecajn grupojn -> Elektu ekzistantajn VPC-sekurecajn grupojn -> PostgreSQL en la sekcio:
Apache Kafka kaj Data Streaming kun Spark Streaming

Poste, en la sekcio opcioj de datumbazo -> nomo de datumbazo -> agordu la nomon - habrDB.

Ni povas lasi la reston de la parametroj, escepte de malŝalti sekurkopion (rezervperiodo de konservado - 0 tagoj), monitorado kaj Performance Insights, defaŭlte. Alklaku la butonon Krei datumbazon:
Apache Kafka kaj Data Streaming kun Spark Streaming

Rojo Pritraktilo

La fina etapo estos la disvolviĝo de Spark-laboro, kiu prilaboros novajn datumojn de Kafka ĉiujn du sekundojn kaj enmetos la rezulton en la datumbazon.

Kiel notite supre, kontrolpunktoj estas kerna mekanismo en SparkStreaming, kiu devas esti agordita por certigi misfunkciadon. Ni uzos kontrolpunktojn kaj, se la proceduro malsukcesas, la modulo Spark Streaming nur bezonos reveni al la lasta kontrolpunkto kaj rekomenci kalkulojn de ĝi por reakiri la perditajn datumojn.

Kontrolpunkto povas esti ebligita metante dosierujon sur mistolerema, fidinda dosiersistemo (ekz. HDFS, S3, ktp.) kie la transirejo informoj estos stokitaj. Ĉi tio estas farita kun, ekzemple:

streamingContext.checkpoint(checkpointDirectory)

En nia ekzemplo, ni uzos la sekvan aliron, nome, se la checkpointDirectory ekzistas, tiam la kunteksto estos rekreita de la kontrolpunktodatumoj. Se la dosierujo ne ekzistas (t.e. ĝi estas ekzekutita por la unua fojo), tiam la funkcio functionToCreateContext estas vokita por krei novan kuntekston kaj agordi DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Ni kreas objekton DirectStream por konekti al la temo "transakcio" uzante la metodon createDirectStream de la biblioteko KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Analizante envenantajn datumojn en JSON-formato:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Uzante Spark SQL, ni faras simplan grupigon kaj montras la rezulton en la konzolo:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Akiri la demandtekston kaj ruli ĝin per Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Kaj tiam ni konservas la rezultajn agregitajn datumojn en tabelon en AWS RDS. Por konservi la agregrezultojn al datumbaza tabelo, ni uzos la skribmetodon de la objekto DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Kelkajn vortojn pri agordo de konekto al AWS RDS. Ni kreis la uzanton kaj pasvorton por ĝi ĉe la paŝo "Deplojante AWS PostgreSQL". Kiel la URL de la datumbaza servilo, vi devus uzi la Finpunkton, kiu estas montrata en la sekcio Konektebleco kaj sekureco:

Apache Kafka kaj Data Streaming kun Spark Streaming

Por ĝuste konekti Spark kaj Kafka, vi devus ruli la laboron per smark-submit uzante la artefakton spark-streaming-kafka-0-8_2.11. Aldone, ni ankaŭ uzos artefakton por interagi kun la PostgreSQL-datumbazo; ni translokigos ilin per --pakaĵoj.

Por fleksebleco de la skripto, ni ankaŭ elprenos la nomon de la mesaĝservilo kaj la temon de kiu ni volas ricevi datumojn kiel enigajn parametrojn.

Do, estas tempo kuri kaj testi la sistemon:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Ĉio funkciis! Kiel vi povas vidi en la suba bildo, dum la aplikaĵo funkcias, novaj agregrezultoj estas montrataj ĉiujn 2 sekundojn, ĉar ni agordas la kunigintervalon al 2 sekundoj kiam ni kreis la objekton StreamingContext:

Apache Kafka kaj Data Streaming kun Spark Streaming

Poste, ni faras simplan konsulton al la datumbazo por kontroli la ĉeeston de registroj en la tabelo transakcio_fluo:

Apache Kafka kaj Data Streaming kun Spark Streaming

konkludo

Ĉi tiu artikolo rigardis ekzemplon de fluo-prilaborado de informoj uzante Spark Streaming kune kun Apache Kafka kaj PostgreSQL. Kun la kresko de datumoj de diversaj fontoj, estas malfacile supertaksi la praktikan valoron de Spark Streaming por krei fluajn kaj realtempajn aplikojn.

Vi povas trovi la plenan fontkodon en mia deponejo ĉe GitHub.

Mi feliĉas diskuti ĉi tiun artikolon, mi antaŭĝojas viajn komentojn, kaj mi ankaŭ esperas konstruan kritikon de ĉiuj zorgemaj legantoj.

Mi deziras al vi sukceson!

PS. Komence estis planite uzi lokan PostgreSQL-datumbazon, sed pro mia amo por AWS, mi decidis movi la datumbazon al la nubo. En la sekva artikolo pri ĉi tiu temo, mi montros kiel efektivigi la tutan sistemon priskribitan supre en AWS uzante AWS Kinesis kaj AWS EMR. Sekvu la novaĵojn!

fonto: www.habr.com

Aldoni komenton