Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Dia duit, Habr! Sa lá atá inniu tógfaimid córas a phróiseálfaidh sruthanna teachtaireachta Apache Kafka ag baint úsáide as Spark Streaming agus scríobhfaimid na torthaí próiseála chuig bunachar sonraí scamall AWS RDS.

Samhlóimid go gcuireann institiúid chreidmheasa áirithe an tasc orainn idirbhearta ag teacht isteach a phróiseáil “ar an eitilt” thar a brainsí go léir. Is féidir é seo a dhéanamh chun staid airgeadra oscailte a ríomh go pras don státchiste, teorainneacha nó torthaí airgeadais le haghaidh idirbheart, etc.

Conas an cás seo a chur i bhfeidhm gan úsáid a bhaint as geasa draíochta agus draíochta - léigh faoin gearrtha! Téigh!

Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming
(Foinse íomhá)

Réamhrá

Ar ndóigh, cuireann próiseáil líon mór sonraí i bhfíor-am neart deiseanna ar fáil le húsáid i gcórais nua-aimseartha. Is é ceann de na teaglamaí is mó tóir air seo ná an teaglaim de Apache Kafka agus Spark Streaming, áit a gcruthaíonn Kafka sruth de phaicéid teachtaireachta ag teacht isteach, agus déanann Spark Streaming na paicéid seo a phróiseáil ag eatramh ama ar leith.

Chun caoinfhulaingt locht an iarratais a mhéadú, úsáidfimid seicphointí. Leis an meicníocht seo, nuair is gá don inneall Spark Streaming sonraí a cailleadh a aisghabháil, ní gá dó ach dul ar ais go dtí an seicphointe deireanach agus ríomhanna a atosú as sin.

Ailtireacht an chórais fhorbartha

Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Comhpháirteanna a úsáideadh:

  • Apache Kafka is córas teachtaireachtaí dáilte foilsigh-síntiúis é. Oiriúnach do thomhaltas teachtaireachtaí as líne agus ar líne. Chun caillteanas sonraí a chosc, déantar teachtaireachtaí Kafka a stóráil ar dhiosca agus a mhacasamhlú laistigh den bhraisle. Tá an córas Kafka tógtha ar bharr na seirbhíse sioncrónaithe ZooKeeper;
  • Sruthú Spark Apache - Comhpháirt spréach chun sonraí sruthú a phróiseáil. Tógtar an modúl Spark Streaming ag baint úsáide as ailtireacht micrea-bhaisc, áit a ndéantar an sruth sonraí a léirmhíniú mar sheicheamh leanúnach de phaicéid sonraí beaga. Glacann Spark Streaming sonraí ó fhoinsí éagsúla agus comhcheanglaítear iad i bpacáistí beaga. Cruthaítear pacáistí nua go tráthrialta. Ag tús gach eatraimh ama, cruthaítear paicéad nua, agus cuirtear aon sonraí a fhaightear le linn an eatraimh sin san áireamh sa phaicéad. Ag deireadh an eatraimh, stopann fás paicéad. Déantar méid an eatramh a chinneadh ag paraiméadar ar a dtugtar an t-eatramh bhaisc;
  • Spark apache - nascann sé próiseáil choibhneasta le ríomhchlárú feidhmiúil Spark. Ciallaíonn sonraí struchtúrtha sonraí a bhfuil scéimre acu, is é sin, sraith amháin réimsí do gach taifead. Tacaíonn Spark SQL le hionchur ó fhoinsí éagsúla sonraí struchtúrtha agus, a bhuí le faisnéis scéimre a bheith ar fáil, ní féidir leis ach na réimsí riachtanacha taifead a aisghabháil go héifeachtach, agus soláthraíonn sé API DataFrame freisin;
  • AWS RDS Is bunachar sonraí coibhneasta scamall-bhunaithe réasúnta saor é, seirbhís gréasáin a shimplíonn socrú, oibriú agus scálú, agus a riarann ​​Amazon go díreach.

Suiteáil agus rith freastalaí Kafka

Sula n-úsáideann tú Kafka go díreach, ní mór duit a chinntiú go bhfuil Java agat, mar gheall ar ... Úsáidtear JVM don obair:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Cruthaimis úsáideoir nua le bheith ag obair le Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Ansin, íoslódáil an dáileadh ó láithreán gréasáin oifigiúil Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Díphacáil an chartlann íoslódála:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Tá an chéad chéim eile roghnach. Is é an fírinne nach gceadaíonn na socruithe réamhshocraithe duit cumais uile Apache Kafka a úsáid go hiomlán. Mar shampla, scrios topaic, catagóir, grúpa ar féidir teachtaireachtaí a fhoilsiú chucu. Chun é seo a athrú, cuirimis an comhad cumraíochta in eagar:

vim ~/kafka/config/server.properties

Cuir iad seo a leanas le deireadh an chomhaid:

delete.topic.enable = true

Sula dtosaíonn tú ar an bhfreastalaí Kafka, ní mór duit an freastalaí ZooKeeper a thosú; úsáidfimid an script chúnta a thagann le dáileadh Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Tar éis do ZooKeeper tosú go rathúil, seol an freastalaí Kafka i gcríochfort ar leith:

bin/kafka-server-start.sh config/server.properties

Cruthaimis topaic nua dar teideal Idirbheart:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Déanaimis deimhin de go bhfuil topaic cruthaithe ina bhfuil an líon riachtanach deighiltí agus macasamhlú:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Déanaimis dearmad ar na huaireanta tástála ar an táirgeoir agus ar an tomhaltóir don ábhar nuachruthaithe. Tá tuilleadh sonraí maidir le conas is féidir leat teachtaireachtaí a sheoladh agus a fháil scríofa sa doiciméadú oifigiúil - Seol roinnt teachtaireachtaí. Bhuel, bogaimid ar aghaidh chuig léiritheoir a scríobh i Python ag baint úsáide as an KafkaProducer API.

Scríobh léiritheoir

Ginfidh an táirgeoir sonraí randamacha - 100 teachtaireacht gach soicind. Ciallaíonn sonraí randamacha foclóir a bhfuil trí réimse ann:

  • Brainse — ainm dhíolphointe na hinstitiúide creidmheasa;
  • airgeadra — airgeadra idirbhirt;
  • méid — méid an idirbhirt. Uimhir dheimhneach a bheidh sa tsuim más ceannach airgeadra ag an mBanc é, agus uimhir dhiúltach más díolachán é.

Breathnaíonn an cód don léiritheoir mar seo:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Ansin, ag baint úsáide as an modh seolta, cuirimid teachtaireacht chuig an bhfreastalaí, chuig an ábhar a theastaíonn uainn, i bhformáid JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Agus an script á rith, faighimid na teachtaireachtaí seo a leanas sa teirminéal:

Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Ciallaíonn sé seo go n-oibríonn gach rud mar a theastaigh uainn - gineann an léiritheoir agus cuireann sé teachtaireachtaí chuig an ábhar a theastaíonn uainn.
Is é an chéad chéim eile ná Spark a shuiteáil agus an sruth teachtaireachta seo a phróiseáil.

Suiteáil Apache Spark

Apache Spark is ardán ríomhaireachta braisle uilíoch ardfheidhmíochta é.

Feidhmíonn Spark níos fearr ná feidhmiúcháin mhóréilimh an tsamhail MapReduce agus é ag tacú le raon níos leithne de chineálacha ríomha, lena n-áirítear fiosrúcháin idirghníomhacha agus próiseáil sruthanna. Tá ról tábhachtach ag luas agus iad ag próiseáil méideanna móra sonraí, ós rud é gur luas é a ligeann duit oibriú go hidirghníomhach gan nóiméad nó uaireanta feithimh a chaitheamh. Ar cheann de na láidreachtaí is mó atá ag Spark a fhágann go bhfuil sé chomh gasta sin tá a chumas chun ríomhaireachtaí cuimhneacháin a dhéanamh.

Tá an creat seo scríofa i Scala, mar sin ní mór duit é a shuiteáil ar dtús:

sudo apt-get install scala

Íoslódáil an dáileadh spark ó láithreán gréasáin oifigiúil.

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Díphacáil an chartlann:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Cuir an cosán le Spark leis an gcomhad bash:

vim ~/.bashrc

Cuir na línte seo a leanas leis tríd an eagarthóir:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Rith an t-ordú thíos tar éis athruithe a dhéanamh ar bashrc:

source ~/.bashrc

AWS PostgreSQL a imscaradh

Níl fágtha ach an bunachar sonraí a imscaradh ina ndéanfaimid an fhaisnéis phróiseáilte a uaslódáil ó na sruthanna. Chuige seo úsáidfimid seirbhís AWS RDS.

Téigh go dtí an consól AWS -> AWS RDS -> Bunachair -> Cruthaigh bunachar sonraí:
Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Roghnaigh PostgreSQL agus cliceáil Ar Aghaidh:
Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Mar Is chun críocha oideachais amháin an sampla seo; úsáidfimid freastalaí saor in aisce “ar a laghad” (Sraith In Aisce):
Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Ansin, cuirimid tic sa bhloc Sraith Saor in Aisce, agus ina dhiaidh sin tairgfear sampla den rang t2.micro dúinn go huathoibríoch - cé go bhfuil sé lag, tá sé saor in aisce agus oiriúnach go leor dár tasc:
Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Ansin tagann rudaí an-tábhachtacha: ainm shampla an bhunachair shonraí, ainm an mháistir-úsáideoir agus a phasfhocal. Ainmnímid an sampla: myHabrTest, máistir-úsáideoir: habr, pasfhocal: habair12345 agus cliceáil ar an gcnaipe Ar Aghaidh:
Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Ar an gcéad leathanach eile tá paraiméadair atá freagrach as inrochtaineacht ár bhfreastalaí bunachar sonraí ón taobh amuigh (Inrochtaineacht phoiblí) agus infhaighteacht poirt:

Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Cruthaímid suíomh nua don ghrúpa slándála VPC, a cheadóidh rochtain sheachtrach ar ár bhfreastalaí bunachar sonraí trí phort 5432 (PostgreSQL).
Rachaimid chuig consól AWS i bhfuinneog brabhsálaí ar leith chuig an bPainéal VPC -> Grúpaí Slándála -> Cruthaigh rannóg grúpa slándála:
Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Shocraigh muid an t-ainm don ghrúpa Slándála - PostgreSQL, cur síos, cuir in iúl cén VPC ar cheart don ghrúpa seo a bheith bainteach leis agus cliceáil ar an gcnaipe Cruthaigh:
Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Líon isteach na rialacha Isteach le haghaidh port 5432 don ghrúpa nuachruthaithe, mar a thaispeántar sa phictiúr thíos. Ní féidir leat an calafort a shonrú de láimh, ach roghnaigh PostgreSQL ón liosta anuas Cineál.

Go docht, ciallaíonn an luach ::/0 infhaighteacht tráchta ag teacht isteach chuig an bhfreastalaí ó gach cearn den domhan, rud nach bhfuil fíor go hiomlán, ach chun anailís a dhéanamh ar an sampla, ligfimid dúinn féin an cur chuige seo a úsáid:
Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Fillimid ar leathanach an bhrabhsálaí, áit a bhfuil “Cumraigh ardsocruithe” againn oscailte agus roghnaigh sa rannóg grúpaí slándála VPC -> Roghnaigh grúpaí slándála VPC atá ann cheana féin -> PostgreSQL:
Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Ar aghaidh, sna roghanna Bunachar Sonraí -> Ainm an Bhunachair Sonraí -> socraigh an t-ainm - habrDB.

Is féidir linn na paraiméadair atá fágtha a fhágáil, cé is moite de dhíchumasú cúltaca (tréimhse coinneála cúltaca - 0 lá), monatóireacht agus Léargais Feidhmíochta, de réir réamhshocraithe. Cliceáil ar an gcnaipe Cruthaigh bunachar sonraí:
Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Láimhseálaí snáithe

Is é an chéim dheireanach ná post Spark a fhorbairt, a phróiseálfaidh sonraí nua ag teacht ó Kafka gach dhá soicind agus a chuirfidh an toradh isteach sa bhunachar sonraí.

Mar a luadh thuas, is meicníocht lárnach iad seicphointí i SparkStreaming a chaithfear a chumrú chun lamháltas locht a chinntiú. Bainfimid úsáid as seicphointí agus, má theipeann ar an nós imeachta, ní bheidh ar an modúl Spark Streaming ach filleadh ar an seicphointe deireanach agus ríomhanna a atosú uaidh chun na sonraí caillte a aisghabháil.

Is féidir seicphointeáil a chumasú trí eolaire a shocrú ar chóras comhaid iontaofa a fhulaingíonn ó thaobh lochtanna (cosúil le HDFS, S3, etc.) ina stórálfar an fhaisnéis seicphointe. Déantar é seo trí úsáid a bhaint as, mar shampla:

streamingContext.checkpoint(checkpointDirectory)

Inár sampla, úsáidfimid an cur chuige seo a leanas, eadhon, má tá checkpointDirectory ann, ansin déanfar an comhthéacs a athchruthú ó na sonraí seicphointe. Mura bhfuil an t-eolaire ann (i.e. curtha i gcrích den chéad uair), ansin glaoitear functionToCreateContext chun comhthéacs nua a chruthú agus DStreams a chumrú:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Cruthaímid oibiacht DirectStream chun nascadh leis an topaic “idirbheart” ag baint úsáide as an modh CreateDirectStream de leabharlann KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Sonraí ag teacht isteach a pharsáil i bhformáid JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Ag baint úsáide as Spark SQL, déanaimid grúpáil simplí agus taispeánaimid an toradh sa chonsól:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Téacs na ceiste a fháil agus é a rith trí Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Agus ansin sábhálaimid na sonraí comhiomlánaithe a thagann as sin i dtábla in AWS RDS. Chun na torthaí comhiomlánaithe a shábháil chuig tábla bunachar sonraí, úsáidfimid modh scríofa an oibiachta DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Cúpla focal faoi nasc a bhunú le AWS RDS. Chruthaíomar an t-úsáideoir agus an pasfhocal dó ag an gcéim “Imscaradh AWS PostgreSQL”. Ba cheart duit Endpoint a úsáid mar url an fhreastalaí bunachar sonraí, a thaispeánfar sa rannán Nascacht & slándála:

Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Chun Spark agus Kafka a nascadh i gceart, ba cheart duit an jab a rith trí smark-submit ag baint úsáide as an déantán spréach-sruthú-kafka-0-8_2.11. Ina theannta sin, úsáidfimid déantán chun idirghníomhú le bunachar sonraí PostgreSQL; aistreoimid iad trí --packages.

Ar mhaithe le solúbthacht an script, cuirfimid san áireamh freisin mar pharaiméadair ionchuir ainm an fhreastalaí teachtaireachta agus an topaic óna dteastaíonn uainn sonraí a fháil.

Mar sin, tá sé in am feidhmiúlacht an chórais a sheoladh agus a sheiceáil:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

D'oibrigh gach rud amach! Mar a fheiceann tú sa phictiúr thíos, agus an feidhmchlár ar siúl, déantar torthaí comhiomlánaithe nua a aschur gach 2 soicind, toisc gur shocraigh muid an t-eatramh baisceála go 2 soicind nuair a chruthaigh muid an réad StreamingContext:

Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Ansin, cuirimid ceist shimplí ar an mbunachar sonraí chun láithreacht na dtaifead sa tábla a sheiceáil sreabhadh_idirbheart:

Apache Kafka agus Próiseáil Sonraí Sruthaithe le Spark Streaming

Conclúid

D'fhéach an t-alt seo ar shampla de phróiseáil sruth faisnéise ag baint úsáide as Spark Streaming i gcomhar le Apache Kafka agus PostgreSQL. Le fás na sonraí ó fhoinsí éagsúla, tá sé deacair rómheastachán a dhéanamh ar luach praiticiúil Spark Streaming chun feidhmchláir sruthú agus fíor-ama a chruthú.

Is féidir leat an cód foinse iomlán a fháil i mo stór ag GitHub.

Tá áthas orm an t-alt seo a phlé, táim ag tnúth le do chuid tuairimí, agus tá súil agam freisin le cáineadh cuiditheach ó gach léitheoir comhbhách.

Is mian liom rath ort!

PS. Ar dtús bhí sé beartaithe bunachar sonraí PostgreSQL áitiúil a úsáid, ach i bhfianaise mo ghrá do AWS, chinn mé an bunachar sonraí a aistriú go dtí an scamall. Sa chéad alt eile ar an ábhar seo, taispeánfaidh mé conas an córas iomlán a thuairiscítear thuas in AWS a chur i bhfeidhm ag baint úsáide as AWS Kinesis agus AWS EMR. Lean an nuacht!

Foinse: will.com

Add a comment