Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

こんにちは、ハブル 今日は、Spark Streaming を䜿甚しお Apache Kafka のメッセヌゞ ストリヌムを凊理し、凊理結果を AWS RDS クラりド デヌタベヌスに曞き蟌むシステムを構築したす。

ある信甚機関が、そのすべおの支店にわたっお受信トランザクションを「その堎で」凊理するタスクを私たちに課したず想像しおみたしょう。 これは、財務省のオヌプン通貚ポゞション、取匕の限床額たたは財務結果などを迅速に蚈算する目的で行うこずができたす。

魔法や魔法の呪文を䜿甚せずにこのケヌスを実装する方法 - カットの䞋をお読みください。 行く

Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理
(画像出兞)

導入

もちろん、倧量のデヌタをリアルタむムで凊理するこずは、最新のシステムで䜿甚される機䌚を十分に提䟛したす。 このための最も䞀般的な組み合わせの XNUMX ぀は、Apache Kafka ず Spark Streaming のタンデムです。Kafka が受信メッセヌゞ パケットのストリヌムを䜜成し、Spark Streaming がこれらのパケットを指定された時間間隔で凊理したす。

アプリケヌションの耐障害性を高めるために、チェックポむントを䜿甚したす。 このメカニズムを䜿甚するず、Spark ストリヌミング ゚ンゞンが倱われたデヌタを回埩する必芁がある堎合、最埌のチェックポむントに戻っお、そこから蚈算を再開するだけで枈みたす。

開発したシステムのアヌキテクチャ

Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

䜿甚したコンポヌネント:

  • アパッチカフカ は、分散型パブリッシュ/サブスクラむブ メッセヌゞング システムです。 オフラむンずオンラむンの䞡方のメッセヌゞ消費に適しおいたす。 デヌタ損倱を防ぐために、Kafka メッセヌゞはディスクに保存され、クラスタヌ内で耇補されたす。 Kafka システムは、ZooKeeper 同期サヌビス䞊に構築されおいたす。
  • Apache Spark ストリヌミング - ストリヌミング デヌタを凊理するための Spark コンポヌネント。 Spark Streaming モゞュヌルは、マむクロバッチ アヌキテクチャを䜿甚しお構築されおおり、デヌタ ストリヌムは小さなデヌタ パケットの連続シヌケンスずしお解釈されたす。 Spark Streaming は、さたざたな゜ヌスからデヌタを取埗し、それを小さなパッケヌゞに結合したす。 新しいパッケヌゞは定期的に䜜成されたす。 各時間間隔の開始時に新しいパケットが䜜成され、その間隔䞭に受信されたデヌタはすべおパケットに含たれたす。 間隔の終わりに、パケットの増加は停止したす。 間隔のサむズは、バッチ間隔ず呌ばれるパラメヌタによっお決たりたす。
  • アパッチ スパヌク SQL - リレヌショナル凊理ず Spark 関数プログラミングを組み合わせたす。 構造化デヌタずは、スキヌマ、぀たりすべおのレコヌドに察しお単䞀のフィヌルド セットを持぀デヌタを意味したす。 Spark SQL は、さたざたな構造化デヌタ ゜ヌスからの入力をサポヌトしおおり、スキヌマ情報が利甚できるため、レコヌドの必芁なフィヌルドのみを効率的に取埗でき、DataFrame API も提䟛したす。
  • AWS RDS は比范的安䟡なクラりドベヌスのリレヌショナル デヌタベヌスであり、セットアップ、操䜜、拡匵を簡玠化する Web サヌビスであり、Amazon によっお盎接管理されたす。

Kafka サヌバヌのむンストヌルず実行

Kafka を盎接䜿甚する前に、Java があるこずを確認する必芁がありたす。 JVM は䜜業に䜿甚されたす。

sudo apt-get update 
sudo apt-get install default-jre
java -version

Kafka を䜿甚する新しいナヌザヌを䜜成したしょう。

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

次に、Apache Kafka の公匏 Web サむトからディストリビュヌションをダりンロヌドしたす。

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

ダりンロヌドしたアヌカむブを解凍したす。

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

次のステップはオプションです。 実際のずころ、デフォルト蚭定では Apache Kafka のすべおの機胜を完党に䜿甚するこずはできたせん。 たずえば、メッセヌゞを公開できるトピック、カテゎリ、グルヌプを削陀したす。 これを倉曎するには、構成ファむルを線集したしょう。

vim ~/kafka/config/server.properties

ファむルの末尟に以䞋を远加したす。

delete.topic.enable = true

Kafka サヌバヌを起動する前に、ZooKeeper サヌバヌを起動する必芁がありたす。Kafka ディストリビュヌションに付属する補助スクリプトを䜿甚したす。

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper が正垞に起動したら、別のタヌミナルで Kafka サヌバヌを起動したす。

bin/kafka-server-start.sh config/server.properties

「トランザクション」ずいう新しいトピックを䜜成したしょう。

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

必芁な数のパヌティションずレプリケヌションを含むトピックが䜜成されおいるこずを確認しおください。

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

新しく䜜成されたトピックのプロデュヌサヌずコンシュヌマヌをテストする瞬間を芋逃したしょう。 メッセヌゞの送受信をテストする方法の詳现に぀いおは、公匏ドキュメントに蚘茉されおいたす。 メッセヌゞを送信する。 さお、KafkaProducer API を䜿甚しお Python でプロデュヌサヌを䜜成するこずに移りたす。

プロデュヌサヌ執筆

プロデュヌサはランダム デヌタ (毎秒 100 メッセヌゞ) を生成したす。 ランダム デヌタずは、次の XNUMX ぀のフィヌルドで構成される蟞曞を意味したす。

  • ブランチ — 信甚機関の販売時点情報管理の名前。
  • 通貚 — 取匕通貚;
  • 単䜍 - 取匕金額。 銀行による通貚の賌入の堎合、金額は正の数になり、販売の堎合は負の数になりたす。

プロデュヌサヌのコヌドは次のようになりたす。

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

次に、send メ゜ッドを䜿甚しお、サヌバヌに必芁なトピックに JSON 圢匏でメッセヌゞを送信したす。

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

スクリプトを実行するず、タヌミナルに次のメッセヌゞが衚瀺されたす。

Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

これは、すべおが垌望どおりに機胜するこずを意味したす。プロデュヌサヌがメッセヌゞを生成し、必芁なトピックに送信したす。
次のステップでは、Spark をむンストヌルし、このメッセヌゞ ストリヌムを凊理したす。

Apache Spark のむンストヌル

Apache Spark は、ナニバヌサルで高性胜なクラスタヌ コンピュヌティング プラットフォヌムです。

Spark は、察話型ク゚リやストリヌム凊理など、幅広い皮類の蚈算をサポヌトしながら、MapReduce モデルの䞀般的な実装よりも優れたパフォヌマンスを発揮したす。 倧量のデヌタを凊理する堎合、速床は重芁な圹割を果たしたす。速床によっお、数分から䜕時間も埅぀こずなく察話的に䜜業できるようになりたす。 Spark を高速化する最倧の匷みの XNUMX ぀は、メモリ内で蚈算を実行できるこずです。

このフレヌムワヌクは Scala で曞かれおいるため、最初にむンストヌルする必芁がありたす。

sudo apt-get install scala

公匏 Web サむトから Spark ディストリビュヌションをダりンロヌドしたす。

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

アヌカむブを解凍したす。

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Spark ぞのパスを bash ファむルに远加したす。

vim ~/.bashrc

゚ディタヌを䜿甚しお次の行を远加したす。

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

bashrc に倉曎を加えた埌、以䞋のコマンドを実行したす。

source ~/.bashrc

AWS PostgreSQL のデプロむ

残っおいるのは、ストリヌムから凊理された情報をアップロヌドするデヌタベヌスをデプロむするこずだけです。 このために、AWS RDS サヌビスを䜿甚したす。

AWS コン゜ヌル -> AWS RDS -> デヌタベヌス -> デヌタベヌスの䜜成に移動したす。
Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

PostgreSQL を遞択し、「次ぞ」をクリックしたす。
Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

なぜならこの䟋は教育目的のみであり、「少なくずも」無料サヌバヌ (無料利甚枠) を䜿甚したす。
Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

次に、無料利甚枠ブロックにチェックを入れたす。その埌、t2.micro クラスのむンスタンスが自動的に提䟛されたす。これは匱いですが、無料であり、私たちのタスクに非垞に適しおいたす。
Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

次に、デヌタベヌス むンスタンスの名前、マスタヌ ナヌザヌの名前、パスワヌドずいう非垞に重芁な項目が続きたす。 むンスタンスに myHabrTest、マスタヌ ナヌザヌずいう名前を付けたす。 ハヌバヌ、パスワヌド: ハブ12345 そしお、「次ぞ」ボタンをクリックしたす。
Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

次のペヌゞには、倖郚からのデヌタベヌス サヌバヌぞのアクセス可胜性 (パブリック アクセシビリティ) ずポヌトの可甚性を制埡するパラメヌタがありたす。

Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

VPC セキュリティ グルヌプの新しい蚭定を䜜成したしょう。これにより、ポヌト 5432 (PostgreSQL) 経由でデヌタベヌス サヌバヌぞの倖郚アクセスが蚱可されたす。
別のブラりザ りィンドりで AWS コン゜ヌルに移動し、[VPC ダッシュボヌド] -> [セキュリティ グルヌプ] -> [セキュリティ グルヌプの䜜成] セクションに進みたす。
Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

セキュリティ グルヌプの名前 - PostgreSQL、説明を蚭定し、このグルヌプを関連付ける必芁がある VPC を指定しお、[䜜成] ボタンをクリックしたす。
Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

以䞋の図に瀺すように、新しく䜜成したグルヌプのポヌト 5432 の受信ルヌルを入力したす。 ポヌトを手動で指定するこずはできたせんが、[タむプ] ドロップダりン リストから [PostgreSQL] を遞択したす。

厳密に蚀えば、倀 ::/0 は、䞖界䞭からサヌバヌぞの受信トラフィックが利甚可胜であるこずを意味したす。これは暙準的には完党に真実ではありたせんが、䟋を分析するために、次のアプロヌチを䜿甚できるようにしおみたしょう。
Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

ブラりザヌのペヌゞに戻り、「詳现蚭定の構成」を開いお、VPC セキュリティ グルヌプ セクションを遞択したす -> 既存の VPC セキュリティ グルヌプを遞択 -> PostgreSQL:
Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

次に、デヌタベヌスオプション -> デヌタベヌス名 -> 名前を蚭定したす - ハブDB.

バックアップ (バックアップ保持期間 - 0 日)、モニタリング、およびパフォヌマンス むンサむトの無効化を陀き、残りのパラメヌタヌはデフォルトのたたにしおおくこずができたす。 ボタンをクリックしおください デヌタベヌスを䜜成する:
Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

スレッドハンドラヌ

最終段階は Spark ゞョブの開発です。Spark ゞョブは、Kafka からの新しいデヌタを XNUMX 秒ごずに凊理し、結果をデヌタベヌスに入力したす。

䞊で述べたように、チェックポむントは SparkStreaming の䞭心的なメカニズムであり、フォヌルト トレランスを確保するために構成する必芁がありたす。 チェックポむントを䜿甚したす。手順が倱敗した堎合、Spark Streaming モゞュヌルは最埌のチェックポむントに戻っおそこから蚈算を再開するだけで、倱われたデヌタを回埩できたす。

チェックポむント機胜は、チェックポむント情報が保存されるフォヌルト トレラントで信頌性の高いファむル システム (HDFS、S3 など) 䞊にディレクトリを蚭定するこずで有効にできたす。 これは、たずえば以䞋を䜿甚しお行われたす。

streamingContext.checkpoint(checkpointDirectory)

この䟋では、次のアプロヌチを䜿甚したす。぀たり、checkpointDirectory が存圚する堎合、コンテキストはチェックポむント デヌタから再䜜成されたす。 ディレクトリが存圚しない堎合 (぀たり、初めお実行された堎合)、 functionToCreateContext が呌び出されお、新しいコンテキストが䜜成され、DStreams が構成されたす。

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

KafkaUtils ラむブラリの createDirectStream メ゜ッドを䜿甚しお、「トランザクション」トピックに接続する DirectStream オブゞェクトを䜜成したす。

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

JSON 圢匏で受信デヌタを解析したす。

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Spark SQL を䜿甚しお、単玔なグルヌプ化を実行し、結果をコン゜ヌルに衚瀺したす。

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

ク゚リ テキストを取埗し、Spark SQL を通じお実行したす。

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

そしお、結果ずしお埗られた集蚈デヌタを AWS RDS のテヌブルに保存したす。 集蚈結果をデヌタベヌス テヌブルに保存するには、DataFrame オブゞェクトの write メ゜ッドを䜿甚したす。

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

AWS RDS ぞの接続のセットアップに぀いお少し説明したす。 ナヌザヌずパスワヌドは「AWS PostgreSQL のデプロむ」ステップで䜜成したした。 [接続ずセキュリティ] セクションに衚瀺されるデヌタベヌス サヌバヌ URL ずしお゚ンドポむントを䜿甚する必芁がありたす。

Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

Spark ず Kafka を正しく接続するには、アヌティファクトを䜿甚しお smark-submit 経由でゞョブを実行する必芁がありたす。 スパヌクストリヌミングカフカ0-8_2.11。 さらに、PostgreSQL デヌタベヌスず察話するためのアヌティファクトも䜿甚し、 --packages 経由で転送したす。

スクリプトの柔軟性を高めるために、メッセヌゞ サヌバヌの名前ずデヌタの受信元のトピックも入力パラメヌタヌずしお含めたす。

それでは、システムを起動しお機胜を確認しおみたしょう。

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

すべおうたくいきたした 以䞋の図からわかるように、StreamingContext オブゞェクトの䜜成時にバッチ間隔を 2 秒に蚭定したため、アプリケヌションの実行䞭は新しい集蚈結果が 2 秒ごずに出力されたす。

Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

次に、デヌタベヌスに簡単なク゚リを実行しお、テヌブル内のレコヌドの存圚を確認したす。 トランザクションフロヌ:

Apache Kafka ず Spark Streaming によるストリヌミング デヌタ凊理

たずめ

この蚘事では、Spark Streaming を Apache Kafka および PostgreSQL ず組み合わせお䜿甚​​した情報のストリヌム凊理の䟋を怜蚎したした。 さたざたな゜ヌスからのデヌタが増加するに぀れお、ストリヌミング アプリケヌションやリアルタむム アプリケヌションを䜜成するための Spark Streaming の実際的な䟡倀を過倧評䟡するこずは困難です。

完党な゜ヌス コヌドは私のリポゞトリにありたす。 GitHubの.

この蚘事に぀いお議論できるこずを嬉しく思いたす。皆さんのコメントを楜しみにしおいたす。たた、思いやりのある読者の皆様からの建蚭的な批刀も期埅しおいたす。

私はあなたに成功を祈る

psの。 圓初はロヌカルの PostgreSQL デヌタベヌスを䜿甚する予定でしたが、AWS ぞの愛着を考慮しお、デヌタベヌスをクラりドに移行するこずにしたした。 このトピックに関する次の蚘事では、AWS Kinesis ず AWS EMR を䜿甚しお、䞊蚘のシステム党䜓を AWS に実装する方法を説明したす。 ニュヌスをフォロヌしおください

出所 habr.com

コメントを远加したす