அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

வணக்கம், ஹப்ர்! ஸ்பார்க் ஸ்ட்ரீமிங்கைப் பயன்படுத்தி அப்பாச்சி காஃப்கா செய்தி ஸ்ட்ரீம்களை செயலாக்கும் மற்றும் செயலாக்க முடிவுகளை AWS RDS கிளவுட் தரவுத்தளத்தில் எழுதும் ஒரு அமைப்பை இன்று உருவாக்குவோம்.

ஒரு குறிப்பிட்ட கடன் நிறுவனம் அதன் அனைத்து கிளைகளிலும் உள்வரும் பரிவர்த்தனைகளை "பறக்கும்போது" செயலாக்கும் பணியை அமைக்கிறது என்று கற்பனை செய்துகொள்வோம். கருவூலத்திற்கான திறந்த நாணய நிலை, வரம்புகள் அல்லது பரிவர்த்தனைகளுக்கான நிதி முடிவுகள் போன்றவற்றை உடனடியாகக் கணக்கிடும் நோக்கத்திற்காக இதைச் செய்யலாம்.

மந்திரம் மற்றும் மந்திர மந்திரங்களைப் பயன்படுத்தாமல் இந்த வழக்கை எவ்வாறு செயல்படுத்துவது - வெட்டு கீழ் படிக்கவும்! போ!

அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்
(பட ஆதாரம்)

அறிமுகம்

நிச்சயமாக, உண்மையான நேரத்தில் அதிக அளவிலான தரவை செயலாக்குவது நவீன அமைப்புகளில் பயன்படுத்த ஏராளமான வாய்ப்புகளை வழங்குகிறது. இதற்கான மிகவும் பிரபலமான சேர்க்கைகளில் ஒன்று அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்கின் ஒருங்கிணைப்பு ஆகும், அங்கு காஃப்கா உள்வரும் செய்தி பாக்கெட்டுகளின் ஸ்ட்ரீமை உருவாக்குகிறது, மேலும் ஸ்பார்க் ஸ்ட்ரீமிங் இந்த பாக்கெட்டுகளை குறிப்பிட்ட நேர இடைவெளியில் செயலாக்குகிறது.

பயன்பாட்டின் தவறு சகிப்புத்தன்மையை அதிகரிக்க, நாங்கள் சோதனைச் சாவடிகளைப் பயன்படுத்துவோம். இந்த பொறிமுறையின் மூலம், ஸ்பார்க் ஸ்ட்ரீமிங் இன்ஜின் இழந்த தரவை மீட்டெடுக்க வேண்டியிருக்கும் போது, ​​அது கடைசி சோதனைச் சாவடிக்குச் சென்று அங்கிருந்து கணக்கீடுகளை மீண்டும் தொடங்க வேண்டும்.

வளர்ந்த அமைப்பின் கட்டிடக்கலை

அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

பயன்படுத்தப்படும் கூறுகள்:

  • அப்பாச்சி காஃப்கா விநியோகிக்கப்பட்ட வெளியீடு-சந்தா செய்தியிடல் அமைப்பு. ஆஃப்லைன் மற்றும் ஆன்லைன் செய்தி நுகர்வுக்கு ஏற்றது. தரவு இழப்பைத் தடுக்க, காஃப்கா செய்திகள் வட்டில் சேமிக்கப்பட்டு க்ளஸ்டருக்குள் பிரதியெடுக்கப்படும். காஃப்கா அமைப்பு ZooKeeper ஒத்திசைவு சேவையின் மேல் கட்டப்பட்டுள்ளது;
  • அப்பாச்சி ஸ்பார்க் ஸ்ட்ரீமிங் - ஸ்ட்ரீமிங் தரவை செயலாக்குவதற்கான ஸ்பார்க் கூறு. ஸ்பார்க் ஸ்ட்ரீமிங் தொகுதி மைக்ரோ-பேட்ச் கட்டமைப்பைப் பயன்படுத்தி கட்டமைக்கப்பட்டுள்ளது, அங்கு தரவு ஸ்ட்ரீம் சிறிய தரவு பாக்கெட்டுகளின் தொடர்ச்சியான வரிசையாக விளக்கப்படுகிறது. ஸ்பார்க் ஸ்ட்ரீமிங் வெவ்வேறு மூலங்களிலிருந்து தரவை எடுத்து சிறிய தொகுப்புகளாக இணைக்கிறது. புதிய தொகுப்புகள் சீரான இடைவெளியில் உருவாக்கப்படுகின்றன. ஒவ்வொரு நேர இடைவெளியின் தொடக்கத்திலும், ஒரு புதிய பாக்கெட் உருவாக்கப்படும், மேலும் அந்த இடைவெளியில் பெறப்பட்ட எந்த தரவுகளும் பாக்கெட்டில் சேர்க்கப்படும். இடைவெளியின் முடிவில், பாக்கெட் வளர்ச்சி நிறுத்தப்படும். இடைவெளியின் அளவு தொகுதி இடைவெளி எனப்படும் அளவுருவால் தீர்மானிக்கப்படுகிறது;
  • அப்பாச்சி ஸ்பார்க் SQL - ஸ்பார்க் செயல்பாட்டு நிரலாக்கத்துடன் தொடர்புடைய செயலாக்கத்தை ஒருங்கிணைக்கிறது. கட்டமைக்கப்பட்ட தரவு என்பது ஒரு ஸ்கீமாவைக் கொண்ட தரவு, அதாவது எல்லாப் பதிவுகளுக்கும் ஒரே புலங்களின் தொகுப்பாகும். Spark SQL ஆனது பல்வேறு கட்டமைக்கப்பட்ட தரவு மூலங்களிலிருந்து உள்ளீட்டை ஆதரிக்கிறது மற்றும் திட்டத் தகவல்களின் இருப்புக்கு நன்றி, இது தேவையான பதிவுகளின் புலங்களை மட்டுமே திறமையாக மீட்டெடுக்க முடியும், மேலும் DataFrame APIகளையும் வழங்குகிறது;
  • AWS RDS ஒப்பீட்டளவில் மலிவான கிளவுட் அடிப்படையிலான தொடர்புடைய தரவுத்தளமாகும், இது அமைவு, செயல்பாடு மற்றும் அளவிடுதல் ஆகியவற்றை எளிதாக்கும் வலை சேவையாகும், மேலும் இது Amazon ஆல் நேரடியாக நிர்வகிக்கப்படுகிறது.

காஃப்கா சேவையகத்தை நிறுவுதல் மற்றும் இயக்குதல்

காஃப்காவை நேரடியாகப் பயன்படுத்துவதற்கு முன், உங்களிடம் ஜாவா இருக்கிறதா என்பதை உறுதிப்படுத்திக் கொள்ள வேண்டும், ஏனென்றால்... JVM வேலைக்குப் பயன்படுத்தப்படுகிறது:

sudo apt-get update 
sudo apt-get install default-jre
java -version

காஃப்காவுடன் பணிபுரிய புதிய பயனரை உருவாக்குவோம்:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

அடுத்து, அதிகாரப்பூர்வ Apache Kafka இணையதளத்திலிருந்து விநியோகத்தைப் பதிவிறக்கவும்:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

பதிவிறக்கம் செய்யப்பட்ட காப்பகத்தைத் திறக்கவும்:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

அடுத்த படி விருப்பமானது. உண்மை என்னவென்றால், இயல்புநிலை அமைப்புகள் அப்பாச்சி காஃப்காவின் அனைத்து திறன்களையும் முழுமையாகப் பயன்படுத்த அனுமதிக்காது. எடுத்துக்காட்டாக, செய்திகளை வெளியிடக்கூடிய தலைப்பு, வகை, குழுவை நீக்கவும். இதை மாற்ற, உள்ளமைவு கோப்பைத் திருத்தலாம்:

vim ~/kafka/config/server.properties

கோப்பின் முடிவில் பின்வருவனவற்றைச் சேர்க்கவும்:

delete.topic.enable = true

காஃப்கா சேவையகத்தைத் தொடங்குவதற்கு முன், நீங்கள் ZooKeeper சேவையகத்தைத் தொடங்க வேண்டும்; காஃப்கா விநியோகத்துடன் வரும் துணை ஸ்கிரிப்டைப் பயன்படுத்துவோம்:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper வெற்றிகரமாக தொடங்கிய பிறகு, காஃப்கா சேவையகத்தை ஒரு தனி முனையத்தில் துவக்கவும்:

bin/kafka-server-start.sh config/server.properties

பரிவர்த்தனை என்ற புதிய தலைப்பை உருவாக்குவோம்:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

தேவையான எண்ணிக்கையிலான பகிர்வுகள் மற்றும் பிரதிகள் கொண்ட தலைப்பு உருவாக்கப்பட்டுள்ளதா என்பதை உறுதி செய்வோம்:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

புதிதாக உருவாக்கப்பட்ட தலைப்புக்காக உற்பத்தியாளரையும் நுகர்வோரையும் சோதிக்கும் தருணங்களைத் தவறவிடுவோம். செய்திகளை அனுப்புவதையும் பெறுவதையும் நீங்கள் எவ்வாறு சோதிக்கலாம் என்பது பற்றிய கூடுதல் விவரங்கள் அதிகாரப்பூர்வ ஆவணத்தில் எழுதப்பட்டுள்ளன - சில செய்திகளை அனுப்பவும். சரி, காஃப்கா புரொட்யூசர் ஏபிஐயைப் பயன்படுத்தி பைத்தானில் ஒரு தயாரிப்பாளரை எழுதத் தொடங்குகிறோம்.

தயாரிப்பாளர் எழுத்து

தயாரிப்பாளர் சீரற்ற தரவை உருவாக்குவார் - ஒவ்வொரு நொடிக்கும் 100 செய்திகள். சீரற்ற தரவு என்பது மூன்று புலங்களைக் கொண்ட அகராதியைக் குறிக்கும்:

  • கிளை - கடன் நிறுவனத்தின் விற்பனை புள்ளியின் பெயர்;
  • நாணய - பரிவர்த்தனை நாணயம்;
  • தொகை - பரிவர்த்தனை தொகை. வங்கியின் கரன்சியை வாங்கினால் அது நேர்மறை எண்ணாகவும், விற்பனையாக இருந்தால் எதிர்மறை எண்ணாகவும் இருக்கும்.

தயாரிப்பாளருக்கான குறியீடு இதுபோல் தெரிகிறது:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

அடுத்து, அனுப்பும் முறையைப் பயன்படுத்தி, சேவையகத்திற்கு, நமக்குத் தேவையான தலைப்புக்கு, JSON வடிவத்தில் ஒரு செய்தியை அனுப்புகிறோம்:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

ஸ்கிரிப்டை இயக்கும் போது, ​​பின்வரும் செய்திகளை முனையத்தில் பெறுவோம்:

அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

இதன் பொருள் அனைத்தும் நாம் விரும்பியபடி செயல்படுகின்றன - தயாரிப்பாளர் நமக்குத் தேவையான தலைப்புக்கு செய்திகளை உருவாக்கி அனுப்புகிறார்.
அடுத்த கட்டமாக ஸ்பார்க்கை நிறுவி இந்த செய்தி ஸ்ட்ரீமைச் செயல்படுத்த வேண்டும்.

Apache Spark ஐ நிறுவுகிறது

அப்பாச்சி ஸ்பார்க் உலகளாவிய மற்றும் உயர் செயல்திறன் கொண்ட கிளஸ்டர் கம்ப்யூட்டிங் தளமாகும்.

ஊடாடும் வினவல்கள் மற்றும் ஸ்ட்ரீம் செயலாக்கம் உட்பட பரந்த அளவிலான கணக்கீட்டு வகைகளை ஆதரிக்கும் போது MapReduce மாதிரியின் பிரபலமான செயலாக்கங்களை விட Spark சிறப்பாக செயல்படுகிறது. அதிக அளவிலான தரவைச் செயலாக்கும்போது வேகம் முக்கியப் பங்கு வகிக்கிறது, ஏனெனில் இது நிமிடங்களோ மணிநேரங்களோ காத்திருக்காமல் ஊடாடும் வேலை செய்ய உங்களை அனுமதிக்கும் வேகம். ஸ்பார்க்கின் மிகப்பெரிய பலங்களில் ஒன்று, நினைவகத்தில் கணக்கீடுகளைச் செய்யும் திறன் ஆகும்.

இந்த கட்டமைப்பு ஸ்கலாவில் எழுதப்பட்டுள்ளது, எனவே நீங்கள் முதலில் இதை நிறுவ வேண்டும்:

sudo apt-get install scala

அதிகாரப்பூர்வ வலைத்தளத்திலிருந்து ஸ்பார்க் விநியோகத்தைப் பதிவிறக்கவும்:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

காப்பகத்தைத் திறக்கவும்:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

பாஷ் கோப்பில் ஸ்பார்க்கிற்கான பாதையைச் சேர்க்கவும்:

vim ~/.bashrc

எடிட்டர் மூலம் பின்வரும் வரிகளைச் சேர்க்கவும்:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrc இல் மாற்றங்களைச் செய்த பிறகு கீழே உள்ள கட்டளையை இயக்கவும்:

source ~/.bashrc

AWS PostgreSQL ஐப் பயன்படுத்துகிறது

தரவுத்தளத்தை வரிசைப்படுத்துவது மட்டுமே எஞ்சியுள்ளது, அதில் ஸ்ட்ரீம்களில் இருந்து செயலாக்கப்பட்ட தகவலை பதிவேற்றுவோம். இதற்கு AWS RDS சேவையைப் பயன்படுத்துவோம்.

AWS கன்சோலுக்குச் செல்லவும் -> AWS RDS -> தரவுத்தளங்கள் -> தரவுத்தளத்தை உருவாக்கவும்:
அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

PostgreSQL ஐத் தேர்ந்தெடுத்து அடுத்து என்பதைக் கிளிக் செய்யவும்:
அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

ஏனெனில் இந்த எடுத்துக்காட்டு கல்வி நோக்கங்களுக்காக மட்டுமே; நாங்கள் இலவச சேவையகத்தை "குறைந்தபட்சம்" (இலவச அடுக்கு) பயன்படுத்துவோம்:
அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

அடுத்து, இலவச அடுக்குத் தொகுதியில் ஒரு டிக் வைக்கிறோம், அதன் பிறகு தானாகவே t2.micro வகுப்பின் உதாரணம் வழங்கப்படும் - பலவீனமாக இருந்தாலும், இது இலவசம் மற்றும் எங்கள் பணிக்கு மிகவும் பொருத்தமானது:
அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

அடுத்து மிக முக்கியமான விஷயங்கள் வரும்: தரவுத்தள நிகழ்வின் பெயர், முதன்மை பயனரின் பெயர் மற்றும் அவரது கடவுச்சொல். உதாரணத்திற்கு பெயரிடுவோம்: myHabrTest, முதன்மை பயனர்: habr, கடவுச்சொல்: habr12345 அடுத்த பொத்தானைக் கிளிக் செய்யவும்:
அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

அடுத்த பக்கத்தில் எங்கள் தரவுத்தள சேவையகத்தை வெளியில் இருந்து அணுகுவதற்கும் (பொது அணுகல்) மற்றும் போர்ட் கிடைக்கும் தன்மைக்கும் பொறுப்பான அளவுருக்கள் உள்ளன:

அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

VPC பாதுகாப்பு குழுவிற்கு புதிய அமைப்பை உருவாக்குவோம், இது போர்ட் 5432 (PostgreSQL) வழியாக எங்கள் தரவுத்தள சேவையகத்திற்கு வெளிப்புற அணுகலை அனுமதிக்கும்.
VPC Dashboard -> Security Groups -> Create Security group பிரிவிற்கு தனி உலாவி சாளரத்தில் AWS கன்சோலுக்கு செல்லலாம்:
அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

பாதுகாப்புக் குழுவிற்குப் பெயரை அமைத்துள்ளோம் - PostgreSQL, ஒரு விளக்கம், இந்தக் குழு எந்த VPC உடன் இணைக்கப்பட வேண்டும் என்பதைக் குறிக்கவும் மற்றும் உருவாக்கு பொத்தானைக் கிளிக் செய்யவும்:
அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

கீழே உள்ள படத்தில் காட்டப்பட்டுள்ளபடி, புதிதாக உருவாக்கப்பட்ட குழுவிற்கு போர்ட் 5432 இன் உள்வரும் விதிகளை நிரப்பவும். நீங்கள் போர்ட்டை கைமுறையாகக் குறிப்பிட முடியாது, ஆனால் வகை கீழ்தோன்றும் பட்டியலில் இருந்து PostgreSQL ஐத் தேர்ந்தெடுக்கவும்.

கண்டிப்பாகச் சொன்னால், மதிப்பு ::/0 என்பது உலகம் முழுவதிலுமிருந்து சேவையகத்திற்கு உள்வரும் ட்ராஃபிக் கிடைப்பதைக் குறிக்கிறது, இது முற்றிலும் உண்மையல்ல, ஆனால் உதாரணத்தை பகுப்பாய்வு செய்ய, இந்த அணுகுமுறையைப் பயன்படுத்த அனுமதிக்கலாம்:
அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

நாங்கள் உலாவிப் பக்கத்திற்குத் திரும்புகிறோம், அங்கு எங்களிடம் “மேம்பட்ட அமைப்புகளை உள்ளமை” என்பதைத் திறந்து, VPC பாதுகாப்புக் குழுக்கள் பிரிவில் தேர்ந்தெடுக்கவும் -> ஏற்கனவே உள்ள VPC பாதுகாப்பு குழுக்களைத் தேர்வு செய்யவும் -> PostgreSQL:
அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

அடுத்து, தரவுத்தள விருப்பங்களில் -> தரவுத்தள பெயர் -> பெயரை அமைக்கவும் - habrDB.

காப்புப்பிரதியை முடக்குதல் (காப்புப் பிரதி வைத்திருத்தல் காலம் - 0 நாட்கள்), கண்காணிப்பு மற்றும் செயல்திறன் நுண்ணறிவு ஆகியவற்றைத் தவிர்த்து, மீதமுள்ள அளவுருக்களை நாம் விட்டுவிடலாம். பொத்தானை கிளிக் செய்யவும் தரவுத்தளத்தை உருவாக்கவும்:
அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

நூல் கையாளுபவர்

இறுதிக் கட்டம் ஒரு ஸ்பார்க் வேலையின் வளர்ச்சியாக இருக்கும், இது ஒவ்வொரு இரண்டு வினாடிகளுக்கும் காஃப்காவிலிருந்து வரும் புதிய தரவைச் செயலாக்கி அதன் முடிவை தரவுத்தளத்தில் உள்ளிடும்.

மேலே குறிப்பிட்டுள்ளபடி, சோதனைச் சாவடிகள் ஸ்பார்க்ஸ்ட்ரீமிங்கில் ஒரு முக்கிய பொறிமுறையாகும், அவை தவறு சகிப்புத்தன்மையை உறுதிப்படுத்த கட்டமைக்கப்பட வேண்டும். நாங்கள் சோதனைச் சாவடிகளைப் பயன்படுத்துவோம், செயல்முறை தோல்வியுற்றால், ஸ்பார்க் ஸ்ட்ரீமிங் தொகுதி கடைசி சோதனைச் சாவடிக்குத் திரும்பி, இழந்த தரவை மீட்டெடுக்க அதிலிருந்து கணக்கீடுகளை மீண்டும் தொடங்க வேண்டும்.

சோதனைச் சாவடித் தகவல் சேமிக்கப்படும், பிழையைத் தாங்கும் நம்பகமான கோப்பு முறைமையில் (HDFS, S3 போன்றவை) கோப்பகத்தை அமைப்பதன் மூலம் சோதனைச் சாவடியை இயக்கலாம். இது பயன்படுத்தி செய்யப்படுகிறது, எடுத்துக்காட்டாக:

streamingContext.checkpoint(checkpointDirectory)

எங்கள் எடுத்துக்காட்டில், பின்வரும் அணுகுமுறையைப் பயன்படுத்துவோம், அதாவது, சோதனைச் சாவடி டைரக்டரி இருந்தால், சோதனைச் சாவடி தரவிலிருந்து சூழல் மீண்டும் உருவாக்கப்படும். கோப்பகம் இல்லை என்றால் (அதாவது முதல் முறையாக செயல்படுத்தப்பட்டது), பின்னர் ஒரு புதிய சூழலை உருவாக்க மற்றும் டிஸ்ட்ரீம்களை உள்ளமைக்க functionToCreateContext அழைக்கப்படுகிறது:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

KafkaUtils நூலகத்தின் createDirectStream முறையைப் பயன்படுத்தி "பரிவர்த்தனை" தலைப்புடன் இணைக்க DirectStream பொருளை உருவாக்குகிறோம்:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

உள்வரும் தரவை JSON வடிவத்தில் பாகுபடுத்துதல்:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL ஐப் பயன்படுத்தி, நாங்கள் ஒரு எளிய குழுவைச் செய்து முடிவை கன்சோலில் காண்பிக்கிறோம்:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

வினவல் உரையைப் பெற்று அதை Spark SQL மூலம் இயக்கவும்:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

இதன் விளைவாக திரட்டப்பட்ட தரவை AWS RDS இல் உள்ள அட்டவணையில் சேமிக்கிறோம். ஒரு தரவுத்தள அட்டவணையில் திரட்டல் முடிவுகளைச் சேமிக்க, DataFrame பொருளின் எழுதும் முறையைப் பயன்படுத்துவோம்:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS உடன் இணைப்பை அமைப்பது பற்றி சில வார்த்தைகள். "AWS PostgreSQL வரிசைப்படுத்துதல்" படிநிலையில் அதற்கான பயனர் மற்றும் கடவுச்சொல்லை உருவாக்கினோம். நீங்கள் எண்ட்பாயிண்ட்டை தரவுத்தள சேவையக url ஆகப் பயன்படுத்த வேண்டும், இது இணைப்பு மற்றும் பாதுகாப்பு பிரிவில் காட்டப்படும்:

அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

ஸ்பார்க் மற்றும் காஃப்காவை சரியாக இணைக்க, கலைப்பொருளைப் பயன்படுத்தி ஸ்மார்க்-சமர்ப்பித்தல் மூலம் வேலையை இயக்க வேண்டும். spark-streaming-kafka-0-8_2.11. கூடுதலாக, PostgreSQL தரவுத்தளத்துடன் தொடர்புகொள்வதற்கு ஒரு கலைப்பொருளையும் பயன்படுத்துவோம்; அவற்றை --packages வழியாக மாற்றுவோம்.

ஸ்கிரிப்ட்டின் நெகிழ்வுத்தன்மைக்காக, செய்தி சேவையகத்தின் பெயர் மற்றும் தரவைப் பெற விரும்பும் தலைப்பையும் உள்ளீட்டு அளவுருக்களாகச் சேர்ப்போம்.

எனவே, கணினியின் செயல்பாட்டைத் தொடங்க மற்றும் சரிபார்க்க வேண்டிய நேரம் இது:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

எல்லாம் வேலை செய்தது! கீழே உள்ள படத்தில் நீங்கள் பார்ப்பது போல், பயன்பாடு இயங்கும் போது, ​​ஒவ்வொரு 2 வினாடிகளுக்கும் புதிய திரட்டல் முடிவுகள் வெளிவரும், ஏனெனில் நாங்கள் StreamingContext ஆப்ஜெக்டை உருவாக்கும் போது 2 வினாடிகளுக்கு பேட்ச் இடைவெளியை அமைத்துள்ளோம்:

அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

அடுத்து, அட்டவணையில் உள்ள பதிவுகளின் இருப்பை சரிபார்க்க தரவுத்தளத்தில் ஒரு எளிய வினவல் செய்கிறோம் பரிவர்த்தனை_ஓட்டம்:

அப்பாச்சி காஃப்கா மற்றும் ஸ்பார்க் ஸ்ட்ரீமிங்குடன் ஸ்ட்ரீமிங் தரவு செயலாக்கம்

முடிவுக்கு

Apache Kafka மற்றும் PostgreSQL உடன் இணைந்து ஸ்பார்க் ஸ்ட்ரீமிங்கைப் பயன்படுத்தி தகவல் ஸ்ட்ரீம் செயலாக்கத்தின் உதாரணத்தை இந்தக் கட்டுரை பார்த்தது. பல்வேறு ஆதாரங்களில் இருந்து தரவுகளின் வளர்ச்சியுடன், ஸ்ட்ரீமிங் மற்றும் நிகழ்நேர பயன்பாடுகளை உருவாக்க ஸ்பார்க் ஸ்ட்ரீமிங்கின் நடைமுறை மதிப்பை மிகைப்படுத்துவது கடினம்.

எனது களஞ்சியத்தில் முழு மூலக் குறியீட்டையும் நீங்கள் காணலாம் மகிழ்ச்சியா.

இந்த கட்டுரையைப் பற்றி விவாதிப்பதில் நான் மகிழ்ச்சியடைகிறேன், உங்கள் கருத்துகளை எதிர்பார்க்கிறேன், மேலும் அக்கறையுள்ள வாசகர்களிடமிருந்து ஆக்கபூர்வமான விமர்சனங்களையும் எதிர்பார்க்கிறேன்.

நான் வெற்றி பெற விரும்புகிறேன்!

சங். ஆரம்பத்தில் இது ஒரு உள்ளூர் PostgreSQL தரவுத்தளத்தைப் பயன்படுத்த திட்டமிடப்பட்டது, ஆனால் AWS மீதான எனது அன்பைக் கருத்தில் கொண்டு, தரவுத்தளத்தை மேகக்கணிக்கு நகர்த்த முடிவு செய்தேன். இந்த தலைப்பில் அடுத்த கட்டுரையில், AWS Kinesis மற்றும் AWS EMR ஐப் பயன்படுத்தி AWS இல் மேலே விவரிக்கப்பட்ட முழு அமைப்பையும் எவ்வாறு செயல்படுத்துவது என்பதைக் காண்பிப்பேன். செய்திகளைப் பின்தொடரவும்!

ஆதாரம்: www.habr.com

கருத்தைச் சேர்