Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Xin chào, Habr! Hôm nay chúng ta sẽ xây dựng một hệ thống xử lý các luồng thông báo Apache Kafka bằng Spark Streaming và ghi kết quả xử lý vào cơ sở dữ liệu đám mây AWS RDS.

Hãy tưởng tượng rằng một tổ chức tín dụng nào đó giao cho chúng ta nhiệm vụ xử lý các giao dịch đến một cách “nhanh chóng” trên tất cả các chi nhánh của tổ chức đó. Điều này có thể được thực hiện nhằm mục đích tính toán kịp thời trạng thái tiền tệ mở cho kho bạc, giới hạn hoặc kết quả tài chính cho các giao dịch, v.v.

Làm thế nào để thực hiện trường hợp này mà không cần sử dụng phép thuật và phép thuật - hãy đọc phần cắt! Đi!

Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming
(Nguồn hình ảnh)

Giới thiệu

Tất nhiên, việc xử lý một lượng lớn dữ liệu trong thời gian thực mang lại nhiều cơ hội sử dụng trong các hệ thống hiện đại. Một trong những sự kết hợp phổ biến nhất cho việc này là sự kết hợp giữa Apache Kafka và Spark Streaming, trong đó Kafka tạo ra một luồng các gói tin đến và Spark Streaming xử lý các gói này trong một khoảng thời gian nhất định.

Để tăng khả năng chịu lỗi của ứng dụng, chúng tôi sẽ sử dụng các điểm kiểm tra. Với cơ chế này, khi Spark Streaming engine cần khôi phục dữ liệu bị mất, nó chỉ cần quay lại điểm kiểm tra cuối cùng và tiếp tục tính toán từ đó.

Kiến trúc của hệ thống đã phát triển

Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Các thành phần được sử dụng:

  • Kafka Apache là một hệ thống nhắn tin đăng ký xuất bản phân tán. Thích hợp cho cả việc sử dụng tin nhắn ngoại tuyến và trực tuyến. Để tránh mất dữ liệu, các tin nhắn Kafka được lưu trữ trên đĩa và được sao chép trong cụm. Hệ thống Kafka được xây dựng dựa trên dịch vụ đồng bộ hóa ZooKeeper;
  • Truyền phát Spark Spark - Thành phần Spark để xử lý dữ liệu truyền phát. Mô-đun Spark Streaming được xây dựng bằng kiến ​​trúc vi lô, trong đó luồng dữ liệu được hiểu là một chuỗi các gói dữ liệu nhỏ liên tục. Spark Streaming lấy dữ liệu từ nhiều nguồn khác nhau và kết hợp thành các gói nhỏ. Các gói mới được tạo ra đều đặn. Vào đầu mỗi khoảng thời gian, một gói mới được tạo và mọi dữ liệu nhận được trong khoảng thời gian đó sẽ được đưa vào gói. Vào cuối khoảng thời gian, việc tăng trưởng gói dừng lại. Kích thước của khoảng được xác định bởi một tham số gọi là khoảng thời gian;
  • Apache Spark SQL - kết hợp xử lý quan hệ với lập trình chức năng Spark. Dữ liệu có cấu trúc nghĩa là dữ liệu có lược đồ, tức là một tập hợp các trường cho tất cả các bản ghi. Spark SQL hỗ trợ đầu vào từ nhiều nguồn dữ liệu có cấu trúc khác nhau và nhờ có sẵn thông tin lược đồ, nó chỉ có thể truy xuất một cách hiệu quả các trường bản ghi bắt buộc, đồng thời cũng cung cấp API DataFrame;
  • AWS RDS là một cơ sở dữ liệu quan hệ dựa trên đám mây tương đối rẻ tiền, dịch vụ web giúp đơn giản hóa việc thiết lập, vận hành và mở rộng quy mô và được quản lý trực tiếp bởi Amazon.

Cài đặt và chạy máy chủ Kafka

Trước khi sử dụng Kafka trực tiếp, bạn cần đảm bảo rằng bạn có Java, bởi vì... JVM được sử dụng cho công việc:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Hãy tạo một người dùng mới để làm việc với Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Tiếp theo, tải xuống bản phân phối từ trang web chính thức của Apache Kafka:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Giải nén kho lưu trữ đã tải xuống:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

Bước tiếp theo là tùy chọn. Thực tế là cài đặt mặc định không cho phép bạn sử dụng đầy đủ tất cả các tính năng của Apache Kafka. Ví dụ: xóa một chủ đề, danh mục, nhóm mà tin nhắn có thể được xuất bản. Để thay đổi điều này, hãy chỉnh sửa tệp cấu hình:

vim ~/kafka/config/server.properties

Thêm phần sau vào cuối tập tin:

delete.topic.enable = true

Trước khi khởi động máy chủ Kafka, bạn cần khởi động máy chủ ZooKeeper; chúng tôi sẽ sử dụng tập lệnh phụ trợ đi kèm với bản phân phối Kafka:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Sau khi ZooKeeper khởi động thành công, hãy khởi chạy máy chủ Kafka trong một thiết bị đầu cuối riêng biệt:

bin/kafka-server-start.sh config/server.properties

Hãy tạo một chủ đề mới gọi là Giao dịch:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Hãy đảm bảo rằng một chủ đề có số lượng phân vùng và bản sao cần thiết đã được tạo:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Hãy bỏ lỡ những khoảnh khắc thử nghiệm nhà sản xuất và người tiêu dùng đối với chủ đề mới tạo. Thông tin chi tiết hơn về cách bạn có thể kiểm tra việc gửi và nhận tin nhắn được viết trong tài liệu chính thức - Gửi một số tin nhắn. Chà, chúng ta chuyển sang viết một nhà sản xuất bằng Python bằng API KafkaProducer.

Nhà sản xuất viết

Nhà sản xuất sẽ tạo dữ liệu ngẫu nhiên - 100 tin nhắn mỗi giây. Theo dữ liệu ngẫu nhiên, chúng tôi muốn nói đến một từ điển bao gồm ba trường:

  • Chi nhánh - tên điểm bán hàng của tổ chức tín dụng;
  • Tiền tệ - tiền tệ giao dịch;
  • Số tiền - số tiền giao dịch. Số tiền sẽ là số dương nếu đó là giao dịch mua tiền tệ của Ngân hàng và là số âm nếu đó là giao dịch bán.

Mã cho nhà sản xuất trông như thế này:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Tiếp theo, bằng cách sử dụng phương thức gửi, chúng tôi gửi tin nhắn đến máy chủ, đến chủ đề chúng tôi cần, ở định dạng JSON:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

Khi chạy tập lệnh, chúng tôi nhận được các thông báo sau trong thiết bị đầu cuối:

Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Điều này có nghĩa là mọi thứ hoạt động như chúng tôi mong muốn - nhà sản xuất tạo và gửi tin nhắn đến chủ đề chúng tôi cần.
Bước tiếp theo là cài đặt Spark và xử lý luồng thông báo này.

Cài đặt Apache Spark

Apache Spark là một nền tảng điện toán cụm phổ quát và hiệu suất cao.

Spark hoạt động tốt hơn so với các triển khai phổ biến của mô hình MapReduce đồng thời hỗ trợ nhiều loại tính toán hơn, bao gồm các truy vấn tương tác và xử lý luồng. Tốc độ đóng một vai trò quan trọng khi xử lý lượng lớn dữ liệu, vì tốc độ cho phép bạn làm việc tương tác mà không phải tốn hàng phút hoặc hàng giờ chờ đợi. Một trong những điểm mạnh lớn nhất giúp Spark có tốc độ nhanh như vậy là khả năng thực hiện các phép tính trong bộ nhớ.

Framework này được viết bằng Scala nên bạn cần cài đặt nó trước:

sudo apt-get install scala

Tải xuống bản phân phối Spark từ trang web chính thức:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Giải nén kho lưu trữ:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Thêm đường dẫn đến Spark vào tệp bash:

vim ~/.bashrc

Thêm các dòng sau thông qua trình chỉnh sửa:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Chạy lệnh bên dưới sau khi thực hiện các thay đổi đối với bashrc:

source ~/.bashrc

Triển khai AWS PostgreSQL

Tất cả những gì còn lại là triển khai cơ sở dữ liệu để chúng tôi tải thông tin đã xử lý từ các luồng vào đó. Để làm điều này, chúng tôi sẽ sử dụng dịch vụ AWS RDS.

Chuyển đến bảng điều khiển AWS -> AWS RDS -> Cơ sở dữ liệu -> Tạo cơ sở dữ liệu:
Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Chọn PostgreSQL và nhấp vào Tiếp theo:
Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Bởi vì Ví dụ này chỉ dành cho mục đích giáo dục; chúng tôi sẽ sử dụng máy chủ miễn phí “ở mức tối thiểu” (Cấp miễn phí):
Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Tiếp theo, chúng ta đánh dấu vào khối Bậc miễn phí và sau đó chúng ta sẽ tự động được cung cấp một phiên bản của lớp t2.micro - tuy yếu nhưng nó miễn phí và khá phù hợp với nhiệm vụ của chúng ta:
Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Tiếp theo là những điều rất quan trọng: tên của phiên bản cơ sở dữ liệu, tên của người dùng chính và mật khẩu của người đó. Hãy đặt tên cho phiên bản: myHabrTest, người dùng chính: áo khoác, mật khẩu mở khóa: habr12345 và nhấp vào nút Tiếp theo:
Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Trên trang tiếp theo có các tham số chịu trách nhiệm về khả năng truy cập của máy chủ cơ sở dữ liệu của chúng tôi từ bên ngoài (Khả năng truy cập công cộng) và tính khả dụng của cổng:

Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Hãy tạo một cài đặt mới cho nhóm bảo mật VPC, cài đặt này sẽ cho phép truy cập bên ngoài vào máy chủ cơ sở dữ liệu của chúng tôi qua cổng 5432 (PostgreSQL).
Chúng ta hãy truy cập bảng điều khiển AWS trong một cửa sổ trình duyệt riêng vào Bảng điều khiển VPC -> Nhóm bảo mật -> Tạo phần nhóm bảo mật:
Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Chúng tôi đặt tên cho nhóm Bảo mật - PostgreSQL, một mô tả, cho biết nhóm này sẽ được liên kết với VPC nào và nhấp vào nút Tạo:
Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Điền các quy tắc Inbound cho cổng 5432 cho nhóm mới tạo, như trong hình bên dưới. Bạn không thể chỉ định cổng theo cách thủ công nhưng hãy chọn PostgreSQL từ danh sách thả xuống Loại.

Nói đúng ra, giá trị ::/0 có nghĩa là tính khả dụng của lưu lượng truy cập đến máy chủ từ khắp nơi trên thế giới, điều này không hoàn toàn đúng về mặt kinh điển, nhưng để phân tích ví dụ, chúng ta hãy cho phép mình sử dụng phương pháp này:
Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Chúng tôi quay lại trang trình duyệt, nơi chúng tôi mở “Định cấu hình cài đặt nâng cao” và chọn trong phần Nhóm bảo mật VPC -> Chọn nhóm bảo mật VPC hiện có -> PostgreSQL:
Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Tiếp theo, trong tùy chọn Cơ sở dữ liệu -> Tên cơ sở dữ liệu -> đặt tên - habrDB.

Theo mặc định, chúng tôi có thể để lại các tham số còn lại, ngoại trừ việc tắt tính năng sao lưu (thời gian lưu giữ bản sao lưu - 0 ngày), theo dõi và Thông tin chi tiết về hiệu suất. Nhấn nút Tạo cơ sở dữ liệu:
Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Trình xử lý chủ đề

Giai đoạn cuối cùng sẽ là phát triển công việc Spark, công việc này sẽ xử lý dữ liệu mới đến từ Kafka cứ sau hai giây và nhập kết quả vào cơ sở dữ liệu.

Như đã lưu ý ở trên, điểm kiểm tra là cơ chế cốt lõi trong SparkStreaming phải được định cấu hình để đảm bảo khả năng chịu lỗi. Chúng tôi sẽ sử dụng các điểm kiểm tra và nếu quy trình không thành công, mô-đun Spark Streaming sẽ chỉ cần quay lại điểm kiểm tra cuối cùng và tiếp tục tính toán từ đó để khôi phục dữ liệu bị mất.

Điểm kiểm tra có thể được bật bằng cách đặt thư mục trên hệ thống tệp đáng tin cậy, có khả năng chịu lỗi (như HDFS, S3, v.v.) trong đó thông tin điểm kiểm tra sẽ được lưu trữ. Điều này được thực hiện bằng cách sử dụng, ví dụ:

streamingContext.checkpoint(checkpointDirectory)

Trong ví dụ của chúng tôi, chúng tôi sẽ sử dụng phương pháp sau, cụ thể là nếu checkpointDirectory tồn tại thì ngữ cảnh sẽ được tạo lại từ dữ liệu điểm kiểm tra. Nếu thư mục không tồn tại (tức là được thực thi lần đầu tiên), thì functionToCreateContext sẽ được gọi để tạo ngữ cảnh mới và định cấu hình DStream:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

Chúng ta tạo một đối tượng DirectStream để kết nối với chủ đề “giao dịch” bằng phương thức createDirectStream của thư viện KafkaUtils:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Phân tích dữ liệu đến ở định dạng JSON:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Sử dụng Spark SQL, chúng tôi thực hiện một nhóm đơn giản và hiển thị kết quả trong bảng điều khiển:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Lấy văn bản truy vấn và chạy nó thông qua Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

Sau đó, chúng tôi lưu dữ liệu tổng hợp thu được vào một bảng trong AWS RDS. Để lưu kết quả tổng hợp vào bảng cơ sở dữ liệu, chúng ta sẽ sử dụng phương thức ghi của đối tượng DataFrame:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

Đôi lời về việc thiết lập kết nối với AWS RDS. Chúng tôi đã tạo người dùng và mật khẩu cho nó ở bước “Triển khai AWS PostgreSQL”. Bạn nên sử dụng Endpoint làm url máy chủ cơ sở dữ liệu, được hiển thị trong phần Kết nối & bảo mật:

Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Để kết nối chính xác Spark và Kafka, bạn nên chạy công việc thông qua smark-submit bằng tạo phẩm tia lửa-streaming-kafka-0-8_2.11. Ngoài ra, chúng tôi cũng sẽ sử dụng một tạo phẩm để tương tác với cơ sở dữ liệu PostgreSQL; chúng tôi sẽ chuyển chúng qua --packages.

Để tập lệnh linh hoạt, chúng tôi cũng sẽ đưa vào làm tham số đầu vào tên của máy chủ tin nhắn và chủ đề mà chúng tôi muốn nhận dữ liệu.

Vì vậy, đã đến lúc khởi chạy và kiểm tra chức năng của hệ thống:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Mọi thứ đã làm ra! Như bạn có thể thấy trong hình bên dưới, trong khi ứng dụng đang chạy, các kết quả tổng hợp mới sẽ được xuất ra cứ sau 2 giây, vì chúng tôi đặt khoảng thời gian phân nhóm thành 2 giây khi tạo đối tượng StreamingContext:

Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Tiếp theo, chúng ta thực hiện một truy vấn đơn giản tới cơ sở dữ liệu để kiểm tra sự hiện diện của các bản ghi trong bảng luồng giao dịch:

Apache Kafka và xử lý dữ liệu truyền phát với Spark Streaming

Kết luận

Bài viết này đã xem xét một ví dụ về xử lý luồng thông tin bằng Spark Streaming kết hợp với Apache Kafka và PostgreSQL. Với sự phát triển của dữ liệu từ nhiều nguồn khác nhau, thật khó để đánh giá quá cao giá trị thực tế của Spark Streaming trong việc tạo các ứng dụng phát trực tuyến và thời gian thực.

Bạn có thể tìm thấy mã nguồn đầy đủ trong kho lưu trữ của tôi tại GitHub.

Tôi rất vui được thảo luận về bài viết này, tôi mong nhận được ý kiến ​​​​của bạn và tôi cũng hy vọng nhận được những lời phê bình mang tính xây dựng từ tất cả những độc giả quan tâm.

Chúc các bạn thành công!

Ps. Ban đầu, người ta dự định sử dụng cơ sở dữ liệu PostgreSQL cục bộ, nhưng vì yêu thích AWS nên tôi quyết định chuyển cơ sở dữ liệu này lên đám mây. Trong bài viết tiếp theo về chủ đề này, tôi sẽ trình bày cách triển khai toàn bộ hệ thống được mô tả ở trên trong AWS bằng AWS Kinesis và AWS EMR. Theo dõi tin tức!

Nguồn: www.habr.com

Thêm một lời nhận xét