Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

ሃይ ሀብር! ዛሬ የስፓርክ ዥረትን በመጠቀም Apache Kafka የመልእክት ዥረቶችን የሚያስኬድ እና የማቀነባበሪያ ውጤቱን ወደ AWS RDS ደመና ዳታቤዝ የምንጽፍበት ስርዓት እንገነባለን።

አንድ የተወሰነ የብድር ተቋም ለሁሉም ቅርንጫፎቹ "በመብረር" የሚመጡ ግብይቶችን የማስኬድ ሥራ ከፊታችን እንደሚጠብቀን እናስብ። ይህ ለግምጃ ቤት ክፍት ምንዛሪ አቀማመጥ በፍጥነት ለማስላት ዓላማ ሊከናወን ይችላል ፣ በግብይቶች ላይ ገደቦች ወይም የገንዘብ ውጤቶች ፣ ወዘተ.

ይህንን ጉዳይ አስማት እና አስማት ሳይጠቀሙ እንዴት እንደሚተገበሩ - በቆራጩ ስር ያንብቡ! ሂድ!

Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት
(የሥዕል ምንጭ)

መግቢያ

በእርግጥ ከፍተኛ መጠን ያለው መረጃን በእውነተኛ ጊዜ ማቀናበር በዘመናዊ ስርዓቶች ውስጥ ለመጠቀም ሰፊ እድሎችን ይሰጣል። ለዚህ በጣም ታዋቂ ከሆኑ ጥምሮች አንዱ የApache Kafka እና Spark Streaming ታንደም ነው፣ ካፍካ የገቢ መልእክት ፓኬቶች ዥረት ይፈጥራል፣ እና Spark Streaming እነዚህን እሽጎች በተወሰነ የጊዜ ልዩነት ያስኬዳል።

የመተግበሪያውን ስህተት መቻቻል ለማሻሻል, የፍተሻ ነጥቦችን - የፍተሻ ነጥቦችን እንጠቀማለን. በዚህ ዘዴ፣ የስፓርክ ዥረት ሞጁል የጠፋውን ውሂብ መልሶ ማግኘት ሲፈልግ፣ ወደ መጨረሻው የፍተሻ ነጥብ ብቻ መመለስ እና ከዚያ ስሌቶችን መቀጠል አለበት።

የተገነባው ስርዓት አርክቴክቸር

Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

ያገለገሉ ክፍሎች፡-

  • Apache Kafka የተሰራጨ የህትመት እና የደንበኝነት መመዝገብ ስርዓት ነው። ለሁለቱም ከመስመር ውጭ እና የመስመር ላይ የመልእክት ፍጆታ ተስማሚ። የውሂብ መጥፋትን ለመከላከል የካፍካ መልዕክቶች በዲስክ ላይ ተከማችተው በክላስተር ውስጥ ይባዛሉ። የካፍካ ስርዓት የተገነባው በ ZooKeeper ማመሳሰል አገልግሎት ላይ ነው።
  • Apache Spark ዥረት - የዥረት ውሂብን ለማስኬድ የስፓርክ አካል። የስፓርክ ዥረት ሞጁል የተገነባው በማይክሮ-ባች አርክቴክቸር ሲሆን የውሂብ ዥረት እንደ ተከታታይ ጥቃቅን የውሂብ እሽጎች ሲተረጎም ነው። ስፓርክ ዥረት ከተለያዩ ምንጮች መረጃን ይወስዳል እና ወደ ትናንሽ ስብስቦች ያዋህደዋል። አዲስ ፓኬጆች በመደበኛ ክፍተቶች ይፈጠራሉ. በእያንዳንዱ የጊዜ ክፍተት መጀመሪያ ላይ አዲስ ፓኬት ይፈጠራል እና በዚህ ጊዜ ውስጥ የተቀበለው ማንኛውም ውሂብ በጥቅሉ ውስጥ ይካተታል። በክፍተቱ መጨረሻ ላይ የፓኬት እድገት ይቆማል. የክፍተቱ መጠን የሚወሰነው ባች ክፍተት ተብሎ በሚጠራው መለኪያ ነው;
  • Apache Spark SQL - ተዛማጅ ሂደትን ከ Spark ተግባራዊ ፕሮግራሚንግ ጋር ያጣምራል። የተዋቀረ መረጃ ንድፍ ያለው ውሂብን ማለትም ለሁሉም መዝገቦች አንድ ነጠላ የመስክ ስብስብን ያመለክታል። Spark SQL ከተለያዩ የተዋቀሩ የመረጃ ምንጮች ግብአትን ይደግፋል እና በሼማ መረጃ መገኘት ምክንያት የሚፈለጉትን የመዝገቦችን መስኮች በብቃት ሰርሾሎ ማውጣት ይችላል እንዲሁም DataFrame APIs ያቀርባል።
  • AWS RDS በአማዞን በቀጥታ የሚተዳደረው በአንፃራዊነት ርካሽ በሆነ ደመና ላይ የተመሰረተ ግንኙነት ዳታቤዝ፣ ማዋቀርን፣ አሠራርን እና ልኬትን ቀላል የሚያደርግ የድር አገልግሎት ነው።

የካፍካ አገልጋይ መጫን እና ማሄድ

ካፍካን በቀጥታ ከመጠቀምዎ በፊት ጃቫ እንዳለዎት ማረጋገጥ አለብዎት, ምክንያቱም JVM ለስራ ጥቅም ላይ ይውላል፡-

sudo apt-get update 
sudo apt-get install default-jre
java -version

ከካፍካ ጋር ለመስራት አዲስ ተጠቃሚ እንፍጠር፡-

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

በመቀጠል የማከፋፈያ መሳሪያውን ከኦፊሴላዊው Apache Kafka ድህረ ገጽ ያውርዱ፡

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

የወረደውን ማህደር ያውጡ፡

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

ቀጣዩ ደረጃ አማራጭ ነው. እውነታው ግን ነባሪ ቅንጅቶች ሁሉንም የ Apache Kafka ባህሪያት ሙሉ በሙሉ እንዲጠቀሙ አይፈቅዱም. ለምሳሌ መልዕክቶች የሚታተሙበትን ርዕስ፣ ምድብ፣ ቡድን ይሰርዙ። ይህንን ለመለወጥ፣ የውቅረት ፋይሉን እናርትዕ፡-

vim ~/kafka/config/server.properties

በፋይሉ መጨረሻ ላይ የሚከተለውን ያክሉ።

delete.topic.enable = true

የካፍካ አገልጋይን ከመጀመርዎ በፊት የ ZooKeeper አገልጋይን መጀመር ያስፈልግዎታል ፣ ከካፍካ ስርጭት ጋር የሚመጣውን አጋዥ ስክሪፕት እንጠቀማለን

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper በተሳካ ሁኔታ ከጀመረ በኋላ የካፍካ አገልጋይ በተለየ ተርሚናል ውስጥ እንጀምራለን፡

bin/kafka-server-start.sh config/server.properties

ግብይት የሚባል አዲስ ርዕስ እንፍጠር፡-

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

የሚፈለገው ክፍልፋዮች እና ድግግሞሽ ብዛት ያለው ርዕስ መፈጠሩን እናረጋግጥ፡-

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

አዲስ ለተፈጠረው ርዕስ አምራቹን እና ሸማቹን የምንሞክረው ጊዜዎችን እናንሳ። መልዕክቶችን መላክ እና መቀበልን እንዴት መሞከር እንደሚችሉ ላይ ተጨማሪ ዝርዝሮች በይፋዊ ሰነዶች ውስጥ ተጽፈዋል - አንዳንድ መልዕክቶችን ላክ. ደህና፣ የ KafkaProducer API በመጠቀም ፕሮዲዩሰርን በፓይዘን ለመጻፍ እንቀጥላለን።

የአዘጋጅ ጽሑፍ

አምራቹ በዘፈቀደ መረጃ ያመነጫል - በየሰከንዱ 100 መልዕክቶች። የዘፈቀደ መረጃ ስንል ሦስት መስኮችን የያዘ መዝገበ ቃላት ማለታችን ነው።

  • ቅርንጫፍ - የብድር ተቋም የሚሸጥበት ቦታ ስም;
  • ገንዘብ - የግብይት ምንዛሬ;
  • መጠን - የግብይት መጠን. ገንዘቡ በባንኩ የመገበያያ ገንዘብ ግዢ ከሆነ, እና ሽያጭ ከሆነ አሉታዊ ይሆናል.

የአምራቹ ኮድ ይህንን ይመስላል።

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

በመቀጠል፣ የመላኪያ ዘዴን በመጠቀም፣ መልእክት ወደ አገልጋዩ፣ ወደምንፈልገው ርዕስ፣ በJSON ቅርጸት እንልካለን።

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

ስክሪፕቱን ስናሄድ በተርሚናል ውስጥ የሚከተሉትን መልዕክቶች እናገኛለን፡-

Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

ይህ ማለት ሁሉም ነገር እኛ እንደፈለግን ነው የሚሰራው - አምራቹ ወደምንፈልገው ርዕስ መልእክት ያመነጫል እና ይልካል።
ቀጣዩ እርምጃ ስፓርክን መጫን እና ይህን የመልእክት ፍሰት ማካሄድ ነው።

Apache Spark ን በመጫን ላይ

የ Apache Spark ሁለገብ እና ከፍተኛ አፈጻጸም ክላስተር ማስላት መድረክ ነው።

በይነተገናኝ መጠይቆችን እና ዥረት መልቀቅን ጨምሮ ለተለያዩ የስሌት አይነቶች ድጋፍ ሲሰጥ ስፓርክ ከMapReduce ሞዴል ታዋቂ አተገባበርን በአፈጻጸም ይበልጣል። ደቂቃዎችን ወይም ሰዓታትን ሳይጠብቁ በይነተገናኝ እንዲሰሩ የሚያስችልዎ ፍጥነት ስለሆነ ከፍተኛ መጠን ያለው ውሂብ በሚሰራበት ጊዜ ፍጥነት ትልቅ ሚና ይጫወታል። ይህን ፍጥነት ለማድረስ የስፓርክ ትልቁ ጥንካሬዎች አንዱ የማስታወሻ ስሌቶችን የመስራት ችሎታ ነው።

ይህ ማዕቀፍ በ Scala ውስጥ ተጽፏል፣ ስለዚህ መጀመሪያ መጫን ያስፈልግዎታል፡-

sudo apt-get install scala

የስፓርክ ስርጭትን ከኦፊሴላዊው ድር ጣቢያ ያውርዱ።

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

ማህደሩን ይንቀሉት፡-

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

ወደ Spark የሚወስደውን መንገድ ወደ bash ፋይል ያክሉ፡-

vim ~/.bashrc

የሚከተሉትን መስመሮች በአርታዒው በኩል ያክሉ።

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

በ bashrc ላይ ለውጦችን ካደረጉ በኋላ የሚከተለውን ትዕዛዝ ያሂዱ:

source ~/.bashrc

AWS PostgreSQLን በማሰማራት ላይ

የውሂብ ጎታውን ለማስፋፋት ይቀራል, የተቀነባበረውን መረጃ ከጅረቶች ውስጥ የምንሞላበት. ይህንን ለማድረግ የAWS RDS አገልግሎትን እንጠቀማለን።

ወደ AWS ኮንሶል ይሂዱ -> AWS RDS -> የውሂብ ጎታዎች -> የውሂብ ጎታ ይፍጠሩ፡
Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

PostgreSQL ን ይምረጡ እና የሚቀጥለውን ቁልፍ ጠቅ ያድርጉ።
Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

ምክንያቱም ይህ ምሳሌ የተተነተነው ለትምህርታዊ ዓላማዎች ብቻ ነው፣ እኛ “ቢያንስ” (ነፃ ደረጃ) ነፃ አገልጋይ እንጠቀማለን።
Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

በመቀጠል ነፃ ደረጃ በሚለው ሳጥን ላይ ምልክት ያድርጉ እና ከዚያ በኋላ የ t2.micro ክፍልን በራስ-ሰር እንሰጠዋለን - ምንም እንኳን ደካማ ፣ ግን ነፃ እና ለተግባራችን ተስማሚ ነው።
Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

በጣም አስፈላጊ ነገሮች ይከተላሉ-የዲቢ ምሳሌ ስም, ዋና ተጠቃሚ ስም እና የይለፍ ቃሉ. ምሳሌውን እንጥራ፡ myHabrTest፣ ዋና ተጠቃሚ፡ ሀብር, የይለፍ ቃል: ሀብር 12345 እና በሚቀጥለው ቁልፍ ላይ ጠቅ ያድርጉ:
Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

ቀጣዩ ገጽ ለዳታቤዝ አገልጋያችን ከውጭ ተደራሽነት (የሕዝብ ተደራሽነት) እና ወደቦች መገኘት ኃላፊነት ያላቸውን መለኪያዎች ይዟል።

Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

ለVPC ደህንነት ቡድን አዲስ መቼት እንፍጠር፣ ይህም ወደ የውሂብ ጎታችን አገልጋይ በፖርት 5432 (PostgreSQL) ውጫዊ መዳረሻ ይፈቅዳል።
በተለየ የአሳሽ መስኮት ወደ AWS ኮንሶል በVPC Dashboard ->የደህንነት ቡድኖች ->የደህንነት ቡድን ክፍል እንፍጠር፡
Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

ለደህንነት ቡድን ስም አዘጋጅተናል - PostgreSQL ፣ መግለጫ ፣ ይህ ቡድን ከየትኛው VPC ጋር መያያዝ እንዳለበት ይግለጹ እና ፍጠር ቁልፍን ጠቅ ያድርጉ።
Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

ከታች በምስሉ ላይ እንደሚታየው አዲስ ለተፈጠረው ቡድን ወደብ 5432 የመግቢያ ደንቦችን እንሞላለን። ወደቡን እራስዎ መግለጽ አይችሉም ነገር ግን ከተቆልቋይ ዓይነት ዝርዝር ውስጥ PostgreSQL ን ይምረጡ።

በትክክል ስንናገር እሴቱ ::/0 ማለት ከአለም ዙሪያ ለአገልጋዩ ገቢ ትራፊክ መገኘት ማለት ነው ፣ይህም በቀኖና እውነት አይደለም ፣ ግን ምሳሌውን ለመተንተን ፣ ይህንን አካሄድ እንጠቀም።
Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

ወደ አሳሹ ገጽ እንመለሳለን ፣ “የላቁ ቅንብሮችን ያዋቅሩ” ይክፈቱ እና የ VPC ደህንነት ቡድኖችን ይምረጡ -> ያሉትን የ VPC ደህንነት ቡድኖችን ይምረጡ -> PostgreSQL በክፍል ውስጥ።
Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

በመቀጠል በክፍል ውስጥ የውሂብ ጎታ አማራጮች -> የውሂብ ጎታ ስም -> ስሙን ያዘጋጁ - habrDB.

በነባሪነት ምትኬን ከማሰናከል (የምትኬ ማቆያ ጊዜ - 0 ቀናት) ፣ ክትትል እና የአፈፃፀም ግንዛቤዎችን ከማሰናከል በስተቀር የተቀሩትን መለኪያዎች መተው እንችላለን። አዝራሩን ጠቅ ያድርጉ የውሂብ ጎታ ይፍጠሩ:
Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

የዥረት ተቆጣጣሪ

የመጨረሻው ደረጃ የ Spark ሥራን ማዳበር ይሆናል, ይህም በየሁለት ሰከንድ ከካፍ አዲስ መረጃን በማካሄድ ውጤቱን ወደ ዳታቤዝ ያስገባል.

ከላይ እንደተገለፀው የፍተሻ ነጥቦች በስፓርክ ዥረት ውስጥ ያሉ ስህተቶችን መቻቻልን ለመስጠት መዋቀር ያለባቸው ዋና ዘዴዎች ናቸው። የፍተሻ ነጥቦችን እንጠቀማለን እና የሂደቱ ብልሽት ሲከሰት የስፓርክ ዥረት ሞጁል የጠፋውን መረጃ ለማግኘት ወደ መጨረሻው የፍተሻ ነጥብ መመለስ እና ከሱ ላይ ስሌቶችን መቀጠል ብቻ ይፈልጋል።

የፍተሻ ነጥቡ መረጃ በሚከማችበት ስህተትን የሚታገስ አስተማማኝ የፋይል ስርዓት (ለምሳሌ ኤችዲኤፍኤስ፣ ኤስ 3፣ ወዘተ) ላይ ማውጫ በማዘጋጀት የፍተሻ ነጥብን ማንቃት ይቻላል። ይህ የሚከናወነው ለምሳሌ በ:

streamingContext.checkpoint(checkpointDirectory)

በእኛ ምሳሌ ውስጥ ፣ የሚከተለውን አካሄድ እንጠቀማለን ፣ ማለትም ፣ የፍተሻ ነጥብ ዳይሬክተሩ ካለ ፣ ከዚያ አውድ ከቁጥጥር መረጃ እንደገና ይፈጠራል። ማውጫው ከሌለ (ማለትም ለመጀመሪያ ጊዜ እየተፈፀመ ነው)፣ ከዚያ የFunctionToCreateContext ተግባር አዲስ አውድ ለመፍጠር እና DStreamsን ለማዘጋጀት ይጠራል፡-

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

የKafkaUtils ቤተ-መጽሐፍትን createDirectStream ዘዴን በመጠቀም ከ"ግብይት" ርዕስ ጋር ለመገናኘት DirectStream ነገርን እንፈጥራለን፡

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

ገቢ ውሂብን በJSON ቅርጸት መተንተን፡-

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL ን በመጠቀም ቀላል መቧደን እና ውጤቱን ወደ ኮንሶሉ እናወጣለን፡

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

የጥያቄውን አካል ማግኘት እና በስፓርክ SQL በኩል ማስኬድ፡-

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

እና ከዚያ የተቀበለውን የተዋሃደ ውሂብ በ AWS RDS ውስጥ ባለው ሠንጠረዥ ውስጥ እናስቀምጠዋለን። የውህደቱን ውጤት ወደ የውሂብ ጎታ ሠንጠረዥ ለማስቀመጥ የዳታ ፍሬም ነገርን የመፃፍ ዘዴን እንጠቀማለን፡-

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

ከAWS RDS ጋር ግንኙነት ስለማዘጋጀት ጥቂት ቃላት። ተጠቃሚውን እና የይለፍ ቃሉን በ"Deploying AWS PostgreSQL" ደረጃ ፈጠርን። እንደ የውሂብ ጎታ አገልጋይ ዩአርኤል፣ የግንኙነት እና ደህንነት ክፍል ውስጥ የሚታየውን የመጨረሻ ነጥብ መጠቀም አለቦት።

Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

ስፓርክን እና ካፍካን በትክክል ለማገናኘት አርቲፊኬቱን በመጠቀም ስራውን በ smark-submit ማካሄድ አለብዎት። ብልጭታ-ዥረት-kafka-0-8_2.11. በተጨማሪም፣ ከPostgreSQL ዳታቤዝ ጋር ለመግባባት ቅርስ እንጠቀማለን፣ በጥቅሎች እናልፋቸዋለን።

ለስክሪፕቱ ተለዋዋጭነት የመልእክት አገልጋዩን ስም እና መረጃን እንደ ግብዓት መለኪያዎች መቀበል የምንፈልገውን ርዕስ እናወጣለን።

ስለዚህ ስርዓቱን ለማሄድ እና ለመሞከር ጊዜው አሁን ነው-

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

ሁሉም ነገር ተሳካ! ከታች በምስሉ ላይ እንደምታዩት አፕሊኬሽኑ እየሄደ እያለ በየ 2 ሰከንድ አዳዲስ የውህደት ውጤቶች ይታያሉ ምክንያቱም የዥረት አውድ ነገርን ስንፈጥር የጥቅል ክፍተቱን ወደ 2 ሰከንድ አዘጋጅተናል።

Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

በመቀጠል, በሰንጠረዡ ውስጥ መዝገቦችን ለመፈተሽ ወደ ዳታቤዝ አንድ ቀላል መጠይቅ እናደርጋለን የግብይት_ፍሰት:

Apache Kafka እና የውሂብ ዥረት በስፓርክ ዥረት

መደምደሚያ

በዚህ ጽሑፍ ውስጥ፣ Spark Streamingን በመጠቀም ከApache Kafka እና PostgreSQL ጋር በመተባበር መረጃን የማሰራጨት ምሳሌ ተወስዷል። ከተለያዩ ምንጮች የተገኘ መረጃ መጠን እያደገ በመምጣቱ፣ የእውነተኛ ጊዜ እና የዥረት አፕሊኬሽኖችን ለመፍጠር የስፓርክ ዥረት ተግባራዊ ጠቀሜታን መገመት ከባድ ነው።

ሙሉውን የምንጭ ኮድ በእኔ ማከማቻ ውስጥ ማግኘት ይችላሉ። የፊልሙ.

በዚህ ጽሑፍ ላይ ለመወያየት ደስተኛ ነኝ, አስተያየቶችዎን በጉጉት እጠብቃለሁ, እና ደግሞ, ከሁሉም የሚመለከታቸው አንባቢዎች ገንቢ ትችት እንደሚሰጥ ተስፋ አደርጋለሁ.

ስኬት እመኛለሁ!

መዝ. በመጀመሪያ የአካባቢ የ PostgreSQL የውሂብ ጎታ ለመጠቀም ታቅዶ ነበር፣ ነገር ግን ለAWS ካለኝ ፍቅር አንጻር የውሂብ ጎታውን ወደ ደመናው ለማንቀሳቀስ ወሰንኩ። በዚህ ርዕስ ላይ በሚቀጥለው መጣጥፍ, AWS Kinesis እና AWS EMR በመጠቀም በ AWS ውስጥ ከላይ የተገለፀውን አጠቃላይ ስርዓት እንዴት እንደሚተገበሩ አሳይዎታለሁ. ዜናውን ተከታተሉ!

ምንጭ: hab.com

አስተያየት ያክሉ