Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Salve, Habr! Hodie systema aedificabimus quod Apache Kafka nuntius fluminum processit utentes Spark Streaming et scribentes eventus processus ad AWS RDS nubecularum datorum.

Fingamus aliquem institutum fidem nobis praebere negotium processus "in musca" per omnes ramos eius ineuntes. Hoc fieri potest ad prompte computandi apertam monetam positionem pro aerario, limitibus vel proventuum oeconomicorum pro transactionibus, etc.

Quomodo hanc causam efficiendi sine magicis et magicis carminibus adhibitis - sub inciso legitur! Perge!

Apache Kafka et Streaming MGE MOS cum scintilla Streaming
(Fontem Imaginis)

introduction

Utique, dispensando magnam copiam notitiarum realium temporis, amplas praebet opportunitates utendi in systematis hodiernis. Una ex maxime popularibus coniunctio haec est tandem Apache Kafka et Spark Streaming, ubi Kafka rivum advenientis nuntius facit, et scintillae processuum harum fasciculorum dato tempore interiecto.

Ad tolerantiam vitiorum augendam adhibitis, checks utemur. Cum hac mechanismo, cum scintillae Streaming machinae notitias amissas recuperare debet, solum ad ultimum LAPIS reverti ac calculos inde repetere necesse est.

Architectura systematis enucleatae

Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Components usus est:

  • Apache Kafka est distributa nuntius ratiocinandi-subscribe ratio. Apta tabitudo nuntiorum et online tam offline. Ad damna notificata impediendam, nuntii Kafka in disco reponuntur et intra botrum repplicantur. Systema Kafka aedificatum est in vertice synchronisationi zooKeeper;
  • Apache scintillam Streaming - Scintilla pars est processus effusis data. Scintilla Streaming modulus fabricatur architecturae parvae batch utens, ubi notitia amnis interpretatur continua serie rerum parvarum fasciculorum. Scintilla Streaming notitias e diversis fontibus accipit et in parvas sarcinas componit. Novae fasciculi certis intervallis creantur. Ineunte cuiusque temporis spatio, novus fasciculus creatur, et quaelibet notitia eo intervallo recepta in fasciculum comprehenditur. In fine intervalli, incrementum fasciculus sistit. Magnitudo intervalli per modulum inter- jectum appellatur ;
  • Apache scintilla SQL - processus relationis coniungit cum scintilla programmandi functionis. Data constructa significat notitia quae schema habet, id est, unum campum pro omnibus monumentis. Scintilla SQL subsidia initus ex variis fontibus structuris data et, propter schematismi promptitudinem, potest efficaciter recuperare solum debitas tabularum regiones, et etiam praebet DataFrame APIs;
  • AWS RDS est relativum insumptuosus nubilus innixus relationis datorum, servitium interretialem quae simplices habeat, operandi et scalas, et immediate ab Amazonio administratur.

Installing et currit Kafka servo

Antequam Kafka directe utatur, debes fac tibi Javam, quia... JVM adhibetur ad opus;

sudo apt-get update 
sudo apt-get install default-jre
java -version

Novam usorem cum Kafka opus faciamus:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Deinceps distributionem ex officiali Apache Kafka website accipe:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

EXIMO downloaded archivo:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Proximum est libitum. Ita res est, quod occasus defectus non te permittit ut omnes facultates Apache Kafka plene utantur. Exempli causa, delere thema, categoriam, catervam ad quam epistulae divulgari possunt. Ad hoc mutandum, configurationem limamus:

vim ~/kafka/config/server.properties

Adde sequenti usque ad finem tabella:

delete.topic.enable = true

Priusquam a servo Kafka incipias, debes servo ZooKeeper incipere, scripto auxiliario utemur quod cum distributione Kafka venit:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Postquam ZooKeeper feliciter incepit, Kafka server in separato termino detrudit:

bin/kafka-server-start.sh config/server.properties

Novum topicum nomine transactionis creare faciamus:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Faciamus ut thema cum requisitis partitionibus et replicationibus creatum sit:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Desideremus momenta probationis producentis et consumpti pro themate novo creato. Plus details quomodo probare potes mittere nuntios accipientesque in documentis officialibus scriptae sunt. Mitte aliquas epistulas. Bene movemur ad scribendum effectorem in Pythone KafkaProducer API utentem.

producentis scripturam

Faciens temere notitias generabit - 100 nuntios omnes secundos. Per temere data intelligimus glossarium tribus agris constans:

  • genere - nomen institutionis crediti punctum venditionis;
  • Monetæ - transaction monetæ;
  • Aliquam - re tantum. Quantitas numerus positivus erit si sit emptio monetae apud Argentariam, et numerus negativus si venditio est.

Codex ad producentis hoc spectat:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Deinde, methodo mittendo utens, nuntium mittimus ad ministratorem, ad thema, quod opus est, in forma JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Quando scriptum currit, epistulas sequentes in termino recipimus:

Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Hoc significat omnia opera sicut voluimus - producens generare et nuntios mittere ad rem quae opus est.
Proximus gradus est scintillam installare et hunc nuntium fluentum procedere.

Apache installing scintilla

Apache Spark est universalis et summus perficientur botrum computandi suggestum.

Scintilla meliores facit quam populares exsecutiones MapReduce exemplar, dum latius rationes computationis sustinent, inclusas interrogationes interactivas et processus amnis. Celeritas munus magni ponderis agit cum magnas copiarum notitiarum dispensando, cum celeritas illa permittit ut sine minuta vel hora exspectatione interactive laboras. One of Spark's Maximas vires, quae tam celeriter efficit, facultas est in- calculis memoriam praestare.

Hoc compage in Scala scriptum est, ut primum illud instituas necesse est;

sudo apt-get install scala

Download Scintilla distributio ex rutrum:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Archivum Unpack:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Adde viam scintillae ad fasciculi vercundus:

vim ~/.bashrc

Sequentes lineas per editorem adde:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Curre imperium infra post mutationes bashrc;

source ~/.bashrc

disponas AWS PostgreSQL

Reliquum est ad explicandum datorum, in quod notitias processitales ex rivulis onerabimus. Hac AWS RDS utemur servitio.

Ite ad AWS consolandum -> AWS RDS -> Databases -> database:
Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Lego PostgreSQL ac preme Next:
Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Quod Hoc exemplum in educational causa tantum est, ut libero servo "ad minimum" (liberum Tier);
Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Deinde ricinum in Libero Tier trunco ​​ponimus, et postea sponte exhibebimus instantiam t2.micro classis -, licet debilis, liberam et ad negotium nostrum satis idoneam;
Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Proxima magnae res veniunt: nomen exempli datorum, nomen domini utentis et tesserae eius. Exempli gratia: myHabrTest, dominus usor; habr, password: habr12345 and click on the Next button:
Apache Kafka et Streaming MGE MOS cum scintilla Streaming

In altera pagina parametri sunt responsabiles pro accessibilitate servientis database nostri ab extra (Publicum accessibilitatem) et portum promptitudinis:

Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Novum occasum pro societate securitatis VPC creemus, quae externam accessum ad serverni datorum per portum 5432 dabunt (PostgreSQL).
Eamus ad AWS consolandum in fenestra pasco separato ad VPC Dashboard -> Securitatis Sodalitates -> Securitatem coetus crea sectionem:
Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Nomen pro sodalitate Securitatis - PostgreSQL, descriptione, indicamus quae VPC hic coetus coniungi debet cum globulo crea et preme:
Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Imple regulas Inbound pro portu 5432 pro coetu recenti creato, ut in imagine infra ostendetur. Portum manually exprimere non potes, sed eligere PostgreSQL ex Typus gutta-down indicem.

Proprie valor ::/0 significat facultatem adveniendi negotiationis ministratori ex toto orbe terrarum, quod canonice non omnino verum est, sed exemplum analysi, accessu hoc uti permittamus:
Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Redimus ad paginam pasco, ubi "configurare occasus provectus" aperta et elige in sectione securitatis in VPC circulorum -> elige coetus securitatem VPC existentes -> PostgreSQL:
Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Dein in optionibus Database -> Nomen Database -> nomen pone - habrDB.

Reliquas parametros relinquere possumus, excepto inactivare tergum (tergum retentionis periodi - 0 dierum), vigilantia et euismod Insights, per defaltam. Click in puga pyga database crea:
Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Post tracto

Ultimus scaena explicatio operis scintillae erit, quae notitias novas e Kafka singulis duobus secundis procedet et proventum datorum intrabit.

Sicut supra dictum est, checkpoints sunt nucleus mechanismus in SparkStreaming qui conformari debet ad tolerantiam culpae praestandam. checks punctis utemur et, si ratio deficit, scintillae Streaming moduli tantum opus est ad ultimum LAPIS redire et ex eo calculos resumere ad datam amissam recuperandam.

Reprehensio possit praebere indicem in culpa toleranti, certae rationi fasciculi (ut HDFS, S3, etc.) in qua notitia LAPIS reponenda est. Hoc fit utens, verbi gratia:

streamingContext.checkpoint(checkpointDirectory)

In exemplo nostro sequenti utemur accessu, nempe, si check punctumDirectorium existit, contextus e notitia LAPIS recreabitur. Si directorium non est (i.e. primum supplicium), tunc munusToCreateContext appellatur ad novum contextum creandum et DStreams configurandum:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

DirectStream rem crearemus ad coniungere cum themate "transactionis" utendi methodum bibliothecae KafkaUtils createDirectStream:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Parsing advenientis notitia in forma JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Scintilla SQL utendo, simplicem copulationem facimus et eventum in console exhibemus:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Questus est textus interrogationis et currit per Scintillam SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Et tunc servamus datam aggregatam inde in tabulam in AWS RDS. Ut proventus aggregationis ad mensam database, scribe methodo obiecti DataFrame utemur;

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Pauca verba de nexum constituendo ad AWS RDS. Usorem et tesseram creavimus pro eo in "explicando AWS PostgreSQL" gradum. Endpoint uti debes sicut URL server datorum, qui in sectione securitatis & Connectivity ostenditur:

Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Ut scintillam et Kafka recte iungas, officium debes per submittere artificio utendo. scintilla amne-kafka-0-8_2.11. Accedit, artificio etiam utemur ad commercium cum datorum PostgreSQL, ea per --packages transferemus.

Ad scriptionis flexibilitatem, etiam input parametri nomen ponemus nuntii servi et locum unde notitia accipere volumus.

Tempus itaque est mittere ac deprimere rationem functionis;

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Omnia elaborata! Ut videre potes in tabula infra, dum applicatio currit, novae aggregationis eventus sunt output omnes 2 secundis, quia intervallum ad secundas 2 secundas constituimus cum obiectum StreamingContext creavimus:

Apache Kafka et Streaming MGE MOS cum scintilla Streaming

Deinde simplicem interrogationem datorum ut reprimantur praesentia monumentorum in mensa transaction_flow:

Apache Kafka et Streaming MGE MOS cum scintilla Streaming

conclusio,

Articulus hic exemplum spectavit processus amnis informationis utendi Spark Streaming in conjunctione cum Apache Kafka et PostgreSQL. Cum ex variis fontibus auctum notitiarum, difficile est aestimare vim practicam Spark Streaming ad diffluentes et reales applicationes creandas.

Potes invenire plenam source codice in repositio at GitHub.

Laetus sum de hoc articulo disputare, tuas commentarios exspecto, et etiam spero criticam structuram ab omnibus curantibus legentibus.

Utinam fortuna!

. Initio destinatum est uti database locali PostgreSQL, sed amorem meum AWS datum, decrevi datorum nubem movere. In sequenti articulo de hoc argumento docebo quomodo totam systema, de quibus supra in AWS, utentibus AWS Kinesis et AWS EMR, docebo. Sequere nuntium!

Source: www.habr.com

Add a comment