స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

హలో, హబ్ర్! ఈ రోజు మనం స్పార్క్ స్ట్రీమింగ్‌ని ఉపయోగించి అపాచీ కాఫ్కా సందేశ ప్రసారాలను ప్రాసెస్ చేసే సిస్టమ్‌ను రూపొందిస్తాము మరియు ప్రాసెసింగ్ ఫలితాలను AWS RDS క్లౌడ్ డేటాబేస్‌కు వ్రాస్తాము.

ఒక నిర్దిష్ట క్రెడిట్ సంస్థ తన శాఖలన్నింటిలో "ఫ్లైలో" ఇన్‌కమింగ్ లావాదేవీలను ప్రాసెస్ చేసే పనిని సెట్ చేస్తుందని ఊహించుకుందాం. ట్రెజరీ కోసం ఓపెన్ కరెన్సీ స్థానం, పరిమితులు లేదా లావాదేవీల కోసం ఆర్థిక ఫలితాలు మొదలైనవాటిని తక్షణమే లెక్కించడం కోసం ఇది చేయవచ్చు.

మేజిక్ మరియు మాయా మంత్రాలను ఉపయోగించకుండా ఈ కేసును ఎలా అమలు చేయాలి - కట్ కింద చదవండి! వెళ్ళండి!

స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్
(చిత్రం మూలం)

పరిచయం

వాస్తవానికి, నిజ సమయంలో పెద్ద మొత్తంలో డేటాను ప్రాసెస్ చేయడం ఆధునిక వ్యవస్థలలో ఉపయోగించడానికి పుష్కల అవకాశాలను అందిస్తుంది. దీనికి అత్యంత ప్రజాదరణ పొందిన కలయికలలో ఒకటి అపాచీ కాఫ్కా మరియు స్పార్క్ స్ట్రీమింగ్ యొక్క టెన్డం, ఇక్కడ కాఫ్కా ఇన్‌కమింగ్ మెసేజ్ ప్యాకెట్‌ల స్ట్రీమ్‌ను సృష్టిస్తుంది మరియు స్పార్క్ స్ట్రీమింగ్ ఈ ప్యాకెట్‌లను ఇచ్చిన సమయ వ్యవధిలో ప్రాసెస్ చేస్తుంది.

అప్లికేషన్ యొక్క తప్పు సహనాన్ని పెంచడానికి, మేము తనిఖీ కేంద్రాలను ఉపయోగిస్తాము. ఈ మెకానిజంతో, స్పార్క్ స్ట్రీమింగ్ ఇంజిన్ కోల్పోయిన డేటాను తిరిగి పొందవలసి వచ్చినప్పుడు, అది చివరి చెక్‌పాయింట్‌కి తిరిగి వెళ్లి, అక్కడ నుండి గణనలను పునఃప్రారంభించాలి.

అభివృద్ధి చెందిన వ్యవస్థ యొక్క ఆర్కిటెక్చర్

స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

ఉపయోగించిన భాగాలు:

  • అపాచీ కాఫ్కా పంపిణీ చేయబడిన ప్రచురణ-చందా సందేశ వ్యవస్థ. ఆఫ్‌లైన్ మరియు ఆన్‌లైన్ సందేశ వినియోగం రెండింటికీ అనుకూలం. డేటా నష్టాన్ని నివారించడానికి, కాఫ్కా సందేశాలు డిస్క్‌లో నిల్వ చేయబడతాయి మరియు క్లస్టర్‌లో ప్రతిరూపం చేయబడతాయి. కాఫ్కా సిస్టమ్ ZooKeeper సింక్రొనైజేషన్ సర్వీస్ పైన నిర్మించబడింది;
  • అపాచీ స్పార్క్ స్ట్రీమింగ్ - స్ట్రీమింగ్ డేటాను ప్రాసెస్ చేయడానికి స్పార్క్ భాగం. స్పార్క్ స్ట్రీమింగ్ మాడ్యూల్ మైక్రో-బ్యాచ్ ఆర్కిటెక్చర్‌ను ఉపయోగించి నిర్మించబడింది, ఇక్కడ డేటా స్ట్రీమ్ చిన్న డేటా ప్యాకెట్‌ల యొక్క నిరంతర క్రమం వలె వివరించబడుతుంది. స్పార్క్ స్ట్రీమింగ్ వివిధ మూలాల నుండి డేటాను తీసుకుంటుంది మరియు దానిని చిన్న ప్యాకేజీలుగా మిళితం చేస్తుంది. కొత్త ప్యాకేజీలు క్రమ వ్యవధిలో సృష్టించబడతాయి. ప్రతి సమయ విరామం ప్రారంభంలో, కొత్త ప్యాకెట్ సృష్టించబడుతుంది మరియు ఆ వ్యవధిలో అందుకున్న ఏదైనా డేటా ప్యాకెట్‌లో చేర్చబడుతుంది. విరామం ముగింపులో, ప్యాకెట్ పెరుగుదల ఆగిపోతుంది. విరామం యొక్క పరిమాణం బ్యాచ్ విరామం అని పిలువబడే పరామితి ద్వారా నిర్ణయించబడుతుంది;
  • అపాచీ స్పార్క్ SQL - స్పార్క్ ఫంక్షనల్ ప్రోగ్రామింగ్‌తో రిలేషనల్ ప్రాసెసింగ్‌ను మిళితం చేస్తుంది. స్ట్రక్చర్డ్ డేటా అంటే స్కీమా ఉన్న డేటా, అంటే అన్ని రికార్డ్‌ల కోసం ఒకే సెట్ ఫీల్డ్‌లు. Spark SQL వివిధ నిర్మాణాత్మక డేటా మూలాల నుండి ఇన్‌పుట్‌కు మద్దతు ఇస్తుంది మరియు స్కీమా సమాచారం యొక్క లభ్యతకు ధన్యవాదాలు, ఇది అవసరమైన రికార్డ్‌ల ఫీల్డ్‌లను మాత్రమే సమర్ధవంతంగా తిరిగి పొందగలదు మరియు డేటాఫ్రేమ్ APIలను కూడా అందిస్తుంది;
  • AWS RDS సాపేక్షంగా చవకైన క్లౌడ్-ఆధారిత రిలేషనల్ డేటాబేస్, సెటప్, ఆపరేషన్ మరియు స్కేలింగ్‌ను సులభతరం చేసే వెబ్ సేవ మరియు నేరుగా Amazon ద్వారా నిర్వహించబడుతుంది.

కాఫ్కా సర్వర్‌ను ఇన్‌స్టాల్ చేయడం మరియు అమలు చేయడం

కాఫ్కాను నేరుగా ఉపయోగించే ముందు, మీరు జావాను కలిగి ఉన్నారని నిర్ధారించుకోవాలి, ఎందుకంటే... JVM పని కోసం ఉపయోగించబడుతుంది:

sudo apt-get update 
sudo apt-get install default-jre
java -version

కాఫ్కాతో పని చేయడానికి కొత్త వినియోగదారుని సృష్టిద్దాం:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

తర్వాత, అధికారిక Apache Kafka వెబ్‌సైట్ నుండి పంపిణీని డౌన్‌లోడ్ చేయండి:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

డౌన్‌లోడ్ చేసిన ఆర్కైవ్‌ను అన్‌ప్యాక్ చేయండి:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

తదుపరి దశ ఐచ్ఛికం. వాస్తవం ఏమిటంటే డిఫాల్ట్ సెట్టింగ్‌లు అపాచీ కాఫ్కా యొక్క అన్ని లక్షణాలను పూర్తిగా ఉపయోగించడానికి మిమ్మల్ని అనుమతించవు. ఉదాహరణకు, సందేశాలను ప్రచురించగల అంశం, వర్గం, సమూహాన్ని తొలగించండి. దీన్ని మార్చడానికి, కాన్ఫిగరేషన్ ఫైల్‌ని ఎడిట్ చేద్దాం:

vim ~/kafka/config/server.properties

ఫైల్ చివర కింది వాటిని జోడించండి:

delete.topic.enable = true

కాఫ్కా సర్వర్‌ని ప్రారంభించే ముందు, మీరు ZooKeeper సర్వర్‌ని ప్రారంభించాలి; మేము కాఫ్కా పంపిణీతో వచ్చే సహాయక స్క్రిప్ట్‌ని ఉపయోగిస్తాము:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper విజయవంతంగా ప్రారంభించిన తర్వాత, కాఫ్కా సర్వర్‌ను ప్రత్యేక టెర్మినల్‌లో ప్రారంభించండి:

bin/kafka-server-start.sh config/server.properties

లావాదేవీ అనే కొత్త టాపిక్‌ని క్రియేట్ చేద్దాం:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

అవసరమైన సంఖ్యలో విభజనలు మరియు ప్రతిరూపణతో ఒక అంశం సృష్టించబడిందని నిర్ధారించుకోండి:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

కొత్తగా సృష్టించబడిన అంశం కోసం నిర్మాత మరియు వినియోగదారుని పరీక్షించే క్షణాలను మిస్ చేద్దాం. మీరు సందేశాలను పంపడం మరియు స్వీకరించడం ఎలా పరీక్షించవచ్చనే దాని గురించి మరిన్ని వివరాలు అధికారిక డాక్యుమెంటేషన్‌లో వ్రాయబడ్డాయి - కొన్ని సందేశాలు పంపండి. సరే, మేము కాఫ్కాప్రొడ్యూసర్ APIని ఉపయోగించి పైథాన్‌లో ప్రొడ్యూసర్‌ని వ్రాయడం కొనసాగిస్తాము.

నిర్మాత రచన

నిర్మాత యాదృచ్ఛిక డేటాను రూపొందిస్తారు - ప్రతి సెకనుకు 100 సందేశాలు. యాదృచ్ఛిక డేటా ద్వారా మేము మూడు ఫీల్డ్‌లతో కూడిన నిఘంటువు అని అర్థం:

  • బ్రాంచ్ - క్రెడిట్ ఇన్‌స్టిట్యూషన్ పాయింట్ ఆఫ్ సేల్ పేరు;
  • కరెన్సీ - లావాదేవీ కరెన్సీ;
  • మొత్తం - లావాదేవి మొత్తం. బ్యాంక్ ద్వారా కరెన్సీని కొనుగోలు చేసినట్లయితే మొత్తం సానుకూల సంఖ్యగా ఉంటుంది మరియు అమ్మకం అయితే ప్రతికూల సంఖ్యగా ఉంటుంది.

నిర్మాత కోసం కోడ్ ఇలా కనిపిస్తుంది:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

తరువాత, పంపే పద్ధతిని ఉపయోగించి, మేము సర్వర్‌కు, మనకు అవసరమైన అంశానికి, JSON ఆకృతిలో సందేశాన్ని పంపుతాము:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

స్క్రిప్ట్‌ను అమలు చేస్తున్నప్పుడు, మేము టెర్మినల్‌లో క్రింది సందేశాలను అందుకుంటాము:

స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

అంటే అన్నీ మనం కోరుకున్నట్లే పనిచేస్తాయని అర్థం - ప్రొడ్యూసర్ మనకు అవసరమైన టాపిక్‌కి మెసేజ్‌లను రూపొందించి పంపుతాడు.
తదుపరి దశ స్పార్క్‌ని ఇన్‌స్టాల్ చేసి, ఈ సందేశ ప్రసారాన్ని ప్రాసెస్ చేయడం.

Apache Sparkని ఇన్‌స్టాల్ చేస్తోంది

అపాచీ స్పార్క్ సార్వత్రిక మరియు అధిక-పనితీరు గల క్లస్టర్ కంప్యూటింగ్ ప్లాట్‌ఫారమ్.

ఇంటరాక్టివ్ ప్రశ్నలు మరియు స్ట్రీమ్ ప్రాసెసింగ్‌తో సహా విస్తృత శ్రేణి గణన రకాలకు మద్దతునిస్తూ MapReduce మోడల్ యొక్క జనాదరణ పొందిన అమలుల కంటే స్పార్క్ మెరుగ్గా పని చేస్తుంది. పెద్ద మొత్తంలో డేటాను ప్రాసెస్ చేసేటప్పుడు వేగం ఒక ముఖ్యమైన పాత్ర పోషిస్తుంది, ఎందుకంటే ఇది నిమిషాలు లేదా గంటలు వేచి ఉండకుండా ఇంటరాక్టివ్‌గా పని చేయడానికి మిమ్మల్ని అనుమతిస్తుంది. స్పార్క్ యొక్క అతిపెద్ద బలాల్లో ఒకటి మెమరీలో గణనలను నిర్వహించగల సామర్థ్యం.

ఈ ఫ్రేమ్‌వర్క్ స్కాలాలో వ్రాయబడింది, కాబట్టి మీరు దీన్ని ముందుగా ఇన్‌స్టాల్ చేయాలి:

sudo apt-get install scala

అధికారిక వెబ్‌సైట్ నుండి స్పార్క్ పంపిణీని డౌన్‌లోడ్ చేయండి:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

ఆర్కైవ్‌ను అన్‌ప్యాక్ చేయండి:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

బాష్ ఫైల్‌కు స్పార్క్‌కు మార్గాన్ని జోడించండి:

vim ~/.bashrc

ఎడిటర్ ద్వారా క్రింది పంక్తులను జోడించండి:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrcకి మార్పులు చేసిన తర్వాత కింది ఆదేశాన్ని అమలు చేయండి:

source ~/.bashrc

AWS PostgreSQLని అమలు చేస్తోంది

స్ట్రీమ్‌ల నుండి ప్రాసెస్ చేయబడిన సమాచారాన్ని అప్‌లోడ్ చేసే డేటాబేస్‌ను అమలు చేయడం మాత్రమే మిగిలి ఉంది. దీని కోసం మేము AWS RDS సేవను ఉపయోగిస్తాము.

AWS కన్సోల్‌కి వెళ్లండి -> AWS RDS -> డేటాబేస్‌లు -> డేటాబేస్ సృష్టించండి:
స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

PostgreSQLని ఎంచుకుని, తదుపరి క్లిక్ చేయండి:
స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

ఎందుకంటే ఈ ఉదాహరణ విద్యా ప్రయోజనాల కోసం మాత్రమే; మేము "కనీసం" (ఉచిత టైర్) ఉచిత సర్వర్‌ని ఉపయోగిస్తాము:
స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

తరువాత, మేము ఫ్రీ టైర్ బ్లాక్‌లో టిక్‌ను ఉంచాము మరియు ఆ తర్వాత మనకు స్వయంచాలకంగా t2.మైక్రో క్లాస్ యొక్క ఉదాహరణ అందించబడుతుంది - బలహీనంగా ఉన్నప్పటికీ, ఇది ఉచితం మరియు మా పనికి చాలా అనుకూలంగా ఉంటుంది:
స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

తర్వాత చాలా ముఖ్యమైన విషయాలు వస్తాయి: డేటాబేస్ ఉదాహరణ పేరు, మాస్టర్ యూజర్ పేరు మరియు అతని పాస్‌వర్డ్. ఉదాహరణకి పేరు పెట్టండి: myHabrTest, మాస్టర్ యూజర్: హాబ్ర్, పాస్వర్డ్: habr12345 మరియు తదుపరి బటన్‌పై క్లిక్ చేయండి:
స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

తరువాతి పేజీలో బయటి నుండి మా డేటాబేస్ సర్వర్ యొక్క యాక్సెసిబిలిటీకి (పబ్లిక్ యాక్సెసిబిలిటీ) మరియు పోర్ట్ లభ్యతకు బాధ్యత వహించే పారామితులు ఉన్నాయి:

స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

VPC భద్రతా సమూహం కోసం కొత్త సెట్టింగ్‌ని క్రియేట్ చేద్దాం, ఇది పోర్ట్ 5432 (PostgreSQL) ద్వారా మా డేటాబేస్ సర్వర్‌కు బాహ్య ప్రాప్యతను అనుమతిస్తుంది.
VPC డాష్‌బోర్డ్ -> సెక్యూరిటీ గ్రూప్‌లు -> క్రియేట్ సెక్యూరిటీ గ్రూప్ సెక్షన్‌కి ప్రత్యేక బ్రౌజర్ విండోలోని AWS కన్సోల్‌కి వెళ్దాం:
స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

మేము భద్రతా సమూహానికి పేరును సెట్ చేసాము - PostgreSQL, ఒక వివరణ, ఈ సమూహం ఏ VPCతో అనుబంధించబడాలో సూచించండి మరియు సృష్టించు బటన్‌ను క్లిక్ చేయండి:
స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

దిగువ చిత్రంలో చూపిన విధంగా, కొత్తగా సృష్టించబడిన సమూహం కోసం పోర్ట్ 5432 కోసం ఇన్‌బౌండ్ నియమాలను పూరించండి. మీరు పోర్ట్‌ను మాన్యువల్‌గా పేర్కొనలేరు, కానీ టైప్ డ్రాప్-డౌన్ జాబితా నుండి PostgreSQLని ఎంచుకోండి.

ఖచ్చితంగా చెప్పాలంటే, విలువ ::/0 అంటే ప్రపంచం నలుమూలల నుండి సర్వర్‌కి ఇన్‌కమింగ్ ట్రాఫిక్ లభ్యత, ఇది కానానికల్‌గా పూర్తిగా నిజం కాదు, కానీ ఉదాహరణను విశ్లేషించడానికి, ఈ విధానాన్ని ఉపయోగించడానికి మనల్ని మనం అనుమతించుకుందాం:
స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

మేము బ్రౌజర్ పేజీకి తిరిగి వస్తాము, ఇక్కడ "అధునాతన సెట్టింగ్‌లను కాన్ఫిగర్ చేయండి" తెరిచి, VPC భద్రతా సమూహాల విభాగంలో ఎంచుకోండి -> ఇప్పటికే ఉన్న VPC భద్రతా సమూహాలను ఎంచుకోండి -> PostgreSQL:
స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

తరువాత, డేటాబేస్ ఎంపికలలో -> డేటాబేస్ పేరు -> పేరును సెట్ చేయండి - habrDB.

మేము డిఫాల్ట్‌గా బ్యాకప్ (బ్యాకప్ నిలుపుదల వ్యవధి - 0 రోజులు), పర్యవేక్షణ మరియు పనితీరు అంతర్దృష్టులను నిలిపివేయడం మినహా మిగిలిన పారామితులను వదిలివేయవచ్చు. బటన్ పై క్లిక్ చేయండి డేటాబేస్ సృష్టించండి:
స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

థ్రెడ్ హ్యాండ్లర్

చివరి దశ స్పార్క్ జాబ్ అభివృద్ధి అవుతుంది, ఇది ప్రతి రెండు సెకన్లకు కాఫ్కా నుండి వచ్చే కొత్త డేటాను ప్రాసెస్ చేస్తుంది మరియు ఫలితాన్ని డేటాబేస్‌లోకి నమోదు చేస్తుంది.

పైన పేర్కొన్నట్లుగా, చెక్‌పాయింట్‌లు స్పార్క్‌స్ట్రీమింగ్‌లో ఒక ప్రధాన మెకానిజం, ఇది తప్పును సహించడాన్ని నిర్ధారించడానికి తప్పనిసరిగా కాన్ఫిగర్ చేయబడాలి. మేము చెక్‌పాయింట్‌లను ఉపయోగిస్తాము మరియు ప్రక్రియ విఫలమైతే, స్పార్క్ స్ట్రీమింగ్ మాడ్యూల్ చివరి చెక్‌పాయింట్‌కు తిరిగి వెళ్లి, కోల్పోయిన డేటాను పునరుద్ధరించడానికి దాని నుండి గణనలను పునఃప్రారంభించవలసి ఉంటుంది.

చెక్‌పాయింట్ సమాచారం నిల్వ చేయబడే తప్పు-తట్టుకునే, విశ్వసనీయ ఫైల్ సిస్టమ్ (HDFS, S3, మొదలైనవి)పై డైరెక్టరీని సెట్ చేయడం ద్వారా చెక్‌పాయింటింగ్ ప్రారంభించబడుతుంది. ఇది ఉపయోగించి చేయబడుతుంది, ఉదాహరణకు:

streamingContext.checkpoint(checkpointDirectory)

మా ఉదాహరణలో, మేము ఈ క్రింది విధానాన్ని ఉపయోగిస్తాము, అవి చెక్‌పాయింట్ డైరెక్టరీ ఉంటే, ఆ సందర్భం చెక్‌పాయింట్ డేటా నుండి మళ్లీ సృష్టించబడుతుంది. డైరెక్టరీ ఉనికిలో లేకుంటే (అనగా మొదటి సారి అమలు చేయబడింది), అప్పుడు కొత్త సందర్భాన్ని సృష్టించడానికి మరియు DStreams కాన్ఫిగర్ చేయడానికి functionToCreateContext అంటారు:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

KafkaUtils లైబ్రరీ యొక్క createDirectStream పద్ధతిని ఉపయోగించి "లావాదేవీ" అంశానికి కనెక్ట్ చేయడానికి మేము డైరెక్ట్‌స్ట్రీమ్ ఆబ్జెక్ట్‌ను సృష్టిస్తాము:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

JSON ఆకృతిలో ఇన్‌కమింగ్ డేటాను అన్వయించడం:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQLని ఉపయోగించి, మేము ఒక సాధారణ సమూహాన్ని చేస్తాము మరియు కన్సోల్‌లో ఫలితాన్ని ప్రదర్శిస్తాము:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

ప్రశ్న వచనాన్ని పొందడం మరియు స్పార్క్ SQL ద్వారా దాన్ని అమలు చేయడం:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

ఆపై మేము ఫలితంగా సమగ్ర డేటాను AWS RDSలోని పట్టికలో సేవ్ చేస్తాము. అగ్రిగేషన్ ఫలితాలను డేటాబేస్ పట్టికలో సేవ్ చేయడానికి, మేము డేటాఫ్రేమ్ ఆబ్జెక్ట్ యొక్క వ్రాత పద్ధతిని ఉపయోగిస్తాము:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDSకి కనెక్షన్‌ని సెటప్ చేయడం గురించి కొన్ని మాటలు. "AWS PostgreSQLని అమలు చేస్తోంది" దశలో మేము దాని కోసం వినియోగదారు మరియు పాస్‌వర్డ్‌ను సృష్టించాము. మీరు ఎండ్‌పాయింట్‌ని డేటాబేస్ సర్వర్ urlగా ఉపయోగించాలి, ఇది కనెక్టివిటీ & సెక్యూరిటీ విభాగంలో ప్రదర్శించబడుతుంది:

స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

స్పార్క్ మరియు కాఫ్కాను సరిగ్గా కనెక్ట్ చేయడానికి, మీరు ఆర్టిఫ్యాక్ట్‌ని ఉపయోగించి స్మార్క్-సబ్మిట్ ద్వారా జాబ్‌ని అమలు చేయాలి స్పార్క్-స్ట్రీమింగ్-కాఫ్కా-0-8_2.11. అదనంగా, మేము PostgreSQL డేటాబేస్తో పరస్పర చర్య చేయడానికి ఒక కళాఖండాన్ని కూడా ఉపయోగిస్తాము; మేము వాటిని --packages ద్వారా బదిలీ చేస్తాము.

స్క్రిప్ట్ యొక్క సౌలభ్యం కోసం, మేము సందేశ సర్వర్ పేరు మరియు మేము డేటాను స్వీకరించాలనుకుంటున్న అంశాన్ని కూడా ఇన్‌పుట్ పారామీటర్‌లుగా చేర్చుతాము.

కాబట్టి, సిస్టమ్ యొక్క కార్యాచరణను ప్రారంభించి, తనిఖీ చేయడానికి ఇది సమయం:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

అంతా పని చేసింది! మీరు దిగువ చిత్రంలో చూడగలిగినట్లుగా, అప్లికేషన్ రన్ అవుతున్నప్పుడు, కొత్త అగ్రిగేషన్ ఫలితాలు ప్రతి 2 సెకన్లకు అవుట్‌పుట్ చేయబడతాయి, ఎందుకంటే మేము StreamingContext ఆబ్జెక్ట్‌ని సృష్టించినప్పుడు బ్యాచింగ్ విరామాన్ని 2 సెకన్లకు సెట్ చేసాము:

స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

తరువాత, పట్టికలో రికార్డుల ఉనికిని తనిఖీ చేయడానికి మేము డేటాబేస్కు ఒక సాధారణ ప్రశ్న చేస్తాము లావాదేవీ_ప్రవాహం:

స్పార్క్ స్ట్రీమింగ్‌తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్

తీర్మానం

ఈ కథనం Apache Kafka మరియు PostgreSQLతో కలిపి స్పార్క్ స్ట్రీమింగ్‌ని ఉపయోగించి సమాచార ప్రసార ప్రాసెసింగ్ యొక్క ఉదాహరణను చూసింది. వివిధ మూలాల నుండి డేటా పెరుగుదలతో, స్ట్రీమింగ్ మరియు రియల్ టైమ్ అప్లికేషన్‌లను రూపొందించడానికి స్పార్క్ స్ట్రీమింగ్ యొక్క ఆచరణాత్మక విలువను అతిగా అంచనా వేయడం కష్టం.

మీరు నా రిపోజిటరీలో పూర్తి సోర్స్ కోడ్‌ని కనుగొనవచ్చు గ్యాలరీలు.

నేను ఈ కథనాన్ని చర్చించడానికి సంతోషంగా ఉన్నాను, నేను మీ వ్యాఖ్యల కోసం ఎదురు చూస్తున్నాను మరియు శ్రద్ధగల పాఠకులందరి నుండి నిర్మాణాత్మక విమర్శలను కూడా ఆశిస్తున్నాను.

మీరు విజయం సాధించాలని కోరుకుంటున్నాను!

కీర్త. మొదట్లో ఇది స్థానిక PostgreSQL డేటాబేస్‌ను ఉపయోగించాలని ప్రణాళిక చేయబడింది, కానీ AWS పట్ల నాకున్న ప్రేమను బట్టి, నేను డేటాబేస్‌ను క్లౌడ్‌కి తరలించాలని నిర్ణయించుకున్నాను. ఈ అంశంపై తదుపరి కథనంలో, AWS కినిసిస్ మరియు AWS EMR ఉపయోగించి AWSలో పైన వివరించిన మొత్తం సిస్టమ్‌ను ఎలా అమలు చేయాలో నేను చూపుతాను. వార్తలను అనుసరించండి!

మూలం: www.habr.com

ఒక వ్యాఖ్యను జోడించండి