Lược đồ SparkEvolution trong thực tế

Các độc giả thân mến, một ngày tốt lành!

Trong bài viết này, nhà tư vấn hàng đầu về lĩnh vực kinh doanh Giải pháp dữ liệu lớn của Neoflex mô tả chi tiết các tùy chọn để xây dựng các tủ trưng bày có cấu trúc biến đổi bằng cách sử dụng Apache Spark.

Là một phần của dự án phân tích dữ liệu, nhiệm vụ xây dựng mặt tiền cửa hàng dựa trên dữ liệu có cấu trúc lỏng lẻo thường phát sinh.

Thông thường, đây là nhật ký hoặc phản hồi từ các hệ thống khác nhau, được lưu dưới dạng JSON hoặc XML. Dữ liệu được tải lên Hadoop, sau đó bạn cần xây dựng mặt tiền cửa hàng từ chúng. Ví dụ: chúng tôi có thể sắp xếp quyền truy cập vào phần giới thiệu đã tạo thông qua Impala.

Trong trường hợp này, sơ đồ của mặt tiền cửa hàng mục tiêu không được biết trước. Hơn nữa, sơ đồ cũng không thể được soạn thảo trước, vì nó phụ thuộc vào dữ liệu và chúng tôi đang xử lý những dữ liệu có cấu trúc rất lỏng lẻo này.

Ví dụ: hôm nay phản hồi sau được ghi lại:

{source: "app1", error_code: ""}

và ngày mai từ cùng một hệ thống sẽ có câu trả lời sau:

{source: "app1", error_code: "error", description: "Network error"}

Do đó, một trường nữa sẽ được thêm vào phần giới thiệu - mô tả và không ai biết liệu nó có đến hay không.

Nhiệm vụ tạo mặt tiền cửa hàng trên dữ liệu đó khá chuẩn và Spark có một số công cụ cho việc này. Để phân tích cú pháp dữ liệu nguồn, có hỗ trợ cho cả JSON và XML và đối với lược đồ chưa biết trước đó, hỗ trợ cho lược đồEvolution được cung cấp.

Thoạt nhìn, giải pháp có vẻ đơn giản. Bạn cần lấy một thư mục có JSON và đọc nó vào một khung dữ liệu. Spark sẽ tạo một lược đồ, biến dữ liệu lồng nhau thành cấu trúc. Hơn nữa, mọi thứ cần được lưu trong sàn gỗ, cũng được hỗ trợ trong Impala, bằng cách đăng ký mặt tiền cửa hàng trong Hive metastore.

Mọi thứ dường như trở nên đơn giản.

Tuy nhiên, không rõ ràng từ các ví dụ ngắn trong tài liệu phải làm gì với một số vấn đề trong thực tế.

Tài liệu này mô tả cách tiếp cận không phải để tạo mặt tiền cửa hàng mà để đọc JSON hoặc XML vào một khung dữ liệu.

Cụ thể, nó chỉ hiển thị cách đọc và phân tích cú pháp JSON:

df = spark.read.json(path...)

Điều này là đủ để cung cấp dữ liệu cho Spark.

Trên thực tế, tập lệnh phức tạp hơn nhiều so với việc chỉ đọc các tệp JSON từ một thư mục và tạo một khung dữ liệu. Tình huống như sau: đã có một mặt tiền cửa hàng nhất định, dữ liệu mới đến hàng ngày, chúng cần được thêm vào mặt tiền cửa hàng, đừng quên rằng sơ đồ có thể khác.

Sơ đồ thông thường để xây dựng một tủ trưng bày như sau:

Bước 1. Dữ liệu được tải vào Hadoop với quá trình tải lại hàng ngày sau đó và được thêm vào một phân vùng mới. Hóa ra một thư mục có dữ liệu ban đầu được phân vùng theo ngày.

Bước 2. Trong quá trình tải ban đầu, thư mục này được Spark đọc và phân tích cú pháp. Khung dữ liệu kết quả được lưu ở định dạng có thể phân tích cú pháp, chẳng hạn như trong sàn gỗ, sau đó có thể được nhập vào Impala. Điều này tạo ra một giới thiệu mục tiêu với tất cả dữ liệu đã tích lũy cho đến thời điểm này.

Bước 3. Một bản tải xuống được tạo sẽ cập nhật mặt tiền cửa hàng mỗi ngày.
Có một câu hỏi về tải tăng dần, sự cần thiết phải phân vùng tủ trưng bày và câu hỏi về việc duy trì sơ đồ chung của tủ trưng bày.

Hãy lấy một ví dụ. Giả sử rằng bước đầu tiên của việc xây dựng kho lưu trữ đã được triển khai và các tệp JSON được tải lên một thư mục.

Tạo một khung dữ liệu từ chúng, sau đó lưu nó dưới dạng giới thiệu, không phải là vấn đề. Đây là bước đầu tiên có thể dễ dàng tìm thấy trong tài liệu Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Tất cả trông đều ổn.

Chúng tôi đọc và phân tích cú pháp JSON, sau đó chúng tôi lưu khung dữ liệu dưới dạng sàn gỗ, đăng ký nó trong Hive theo bất kỳ cách thuận tiện nào:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Chúng tôi nhận được một cửa sổ.

Tuy nhiên, ngày hôm sau, dữ liệu mới từ nguồn đã được thêm vào. Chúng tôi có một thư mục chứa JSON và một chương trình giới thiệu được tạo từ thư mục này. Sau khi tải lô dữ liệu tiếp theo từ nguồn, siêu thị dữ liệu thiếu dữ liệu của một ngày.

Giải pháp hợp lý sẽ là phân vùng mặt tiền cửa hàng theo ngày, điều này sẽ cho phép thêm một phân vùng mới vào mỗi ngày tiếp theo. Cơ chế này cũng được nhiều người biết đến, Spark cho phép bạn ghi các phân vùng riêng biệt.

Đầu tiên, chúng tôi thực hiện tải ban đầu, lưu dữ liệu như mô tả ở trên, chỉ thêm phân vùng. Hành động này được gọi là khởi tạo mặt tiền cửa hàng và chỉ được thực hiện một lần:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Ngày hôm sau, chúng tôi chỉ tải một phân vùng mới:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Tất cả những gì còn lại là đăng ký lại trong Hive để cập nhật lược đồ.
Tuy nhiên, đây là nơi phát sinh vấn đề.

Vấn đề đầu tiên. Sớm hay muộn, sàn gỗ kết quả sẽ không thể đọc được. Điều này là do cách sàn gỗ và JSON xử lý các trường trống khác nhau.

Hãy xem xét một tình huống điển hình. Ví dụ: JSON ngày hôm qua đến:

День 1: {"a": {"b": 1}},

và ngày nay, cùng một JSON trông như thế này:

День 2: {"a": null}

Giả sử chúng ta có hai phân vùng khác nhau, mỗi phân vùng có một dòng.
Khi chúng ta đọc toàn bộ dữ liệu nguồn, Spark sẽ có thể xác định loại và sẽ hiểu rằng "a" là trường thuộc loại "cấu trúc", với trường lồng nhau "b" thuộc loại INT. Tuy nhiên, nếu mỗi phân vùng được lưu riêng, thì chúng ta sẽ có một sàn gỗ với các sơ đồ phân vùng không tương thích:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Tình huống này ai cũng biết, do đó, một tùy chọn đã được thêm đặc biệt - khi phân tích cú pháp dữ liệu nguồn, hãy xóa các trường trống:

df = spark.read.json("...", dropFieldIfAllNull=True)

Trong trường hợp này, sàn gỗ sẽ bao gồm các phân vùng có thể được đọc cùng nhau.
Mặc dù những người đã làm điều này trong thực tế sẽ mỉm cười cay đắng ở đây. Tại sao? Có, bởi vì có khả năng có hai tình huống nữa. Hoặc ba. Hoặc bốn. Đầu tiên, gần như chắc chắn sẽ xảy ra, đó là các kiểu số sẽ trông khác nhau trong các tệp JSON khác nhau. Ví dụ: {intField: 1} và {intField: 1.1}. Nếu các trường như vậy được tìm thấy trong một phân vùng, thì lược đồ hợp nhất sẽ đọc chính xác mọi thứ, dẫn đến loại chính xác nhất. Nhưng nếu ở những cái khác nhau, thì một cái sẽ có intField: int và cái kia sẽ có intField: double.

Có cờ sau để xử lý tình huống này:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Bây giờ chúng tôi có một thư mục chứa các phân vùng có thể được đọc vào một khung dữ liệu duy nhất và một sàn gỗ hợp lệ của toàn bộ tủ trưng bày. Đúng? KHÔNG.

Chúng ta phải nhớ rằng chúng ta đã đăng ký bảng trong Hive. Hive không phân biệt chữ hoa chữ thường trong tên trường, trong khi sàn gỗ phân biệt chữ hoa chữ thường. Do đó, các phân vùng có lược đồ: field1: int và Field1: int giống với Hive, nhưng không giống với Spark. Đừng quên chuyển đổi tên trường thành chữ thường.

Sau đó, mọi thứ có vẻ ổn.

Tuy nhiên, không phải tất cả đều đơn giản như vậy. Có một vấn đề thứ hai, cũng được nhiều người biết đến. Vì mỗi phân vùng mới được lưu riêng nên thư mục phân vùng sẽ chứa các tệp dịch vụ Spark, chẳng hạn như cờ thành công của thao tác _SUCCESS. Điều này sẽ gây ra lỗi khi cố gắng lát gỗ. Để tránh điều này, bạn cần cấu hình cấu hình để ngăn Spark thêm tệp dịch vụ vào thư mục:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Có vẻ như bây giờ mỗi ngày một phân vùng sàn gỗ mới được thêm vào thư mục trưng bày mục tiêu, nơi chứa dữ liệu được phân tích cú pháp trong ngày. Chúng tôi đã lưu ý trước rằng không có phân vùng nào có xung đột kiểu dữ liệu.

Nhưng, chúng ta có một vấn đề thứ ba. Bây giờ, lược đồ chung không được biết, hơn nữa, bảng trong Hive có lược đồ không chính xác, vì mỗi phân vùng mới rất có thể đã đưa một biến dạng vào lược đồ.

Bạn cần phải đăng ký lại bảng. Điều này có thể được thực hiện một cách đơn giản: đọc lại sàn gỗ của mặt tiền cửa hàng, lấy giản đồ và tạo một DDL dựa trên nó, để đăng ký lại thư mục trong Hive dưới dạng bảng bên ngoài, cập nhật giản đồ của mặt tiền cửa hàng đích.

Chúng tôi có một vấn đề thứ tư. Khi chúng tôi đăng ký bàn lần đầu tiên, chúng tôi đã dựa vào Spark. Bây giờ chúng tôi tự làm điều đó và chúng tôi cần nhớ rằng các trường sàn gỗ có thể bắt đầu bằng các ký tự không được phép cho Hive. Ví dụ: Spark đưa ra các dòng mà nó không thể phân tích cú pháp trong trường "corrupt_record". Một trường như vậy không thể được đăng ký trong Hive mà không bị thoát.

Biết được điều này, chúng ta có sơ đồ:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("mảng<`", "mảng<") tạo DDL an toàn, tức là thay vì:

create table tname (_field1 string, 1field string)

Với các tên trường như "_field1, 1field", DDL an toàn được tạo khi các tên trường được thoát: tạo bảng `tname` (`_field1` string, `1field` string).

Câu hỏi đặt ra: làm thế nào để có được một khung dữ liệu đúng cách với một lược đồ hoàn chỉnh (trong mã pf)? Làm thế nào để có được pf này? Đây là vấn đề thứ năm. Đọc lại lược đồ của tất cả các phân vùng từ thư mục có tệp sàn gỗ của tủ trưng bày mục tiêu? Phương pháp này là an toàn nhất, nhưng khó khăn.

Lược đồ đã có trong Hive. Bạn có thể có một lược đồ mới bằng cách kết hợp lược đồ của toàn bộ bảng và phân vùng mới. Vì vậy, bạn cần lấy lược đồ bảng từ Hive và kết hợp nó với lược đồ của phân vùng mới. Điều này có thể được thực hiện bằng cách đọc siêu dữ liệu thử nghiệm từ Hive, lưu nó vào một thư mục tạm thời và sử dụng Spark để đọc cả hai phân vùng cùng một lúc.

Trên thực tế, có mọi thứ bạn cần: lược đồ bảng gốc trong Hive và phân vùng mới. Chúng tôi cũng có dữ liệu. Chỉ còn cách lấy một lược đồ mới kết hợp lược đồ mặt tiền cửa hàng và các trường mới từ phân vùng đã tạo:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Tiếp theo, chúng tôi tạo DDL đăng ký bảng, như trong đoạn mã trước.
Nếu toàn bộ chuỗi hoạt động chính xác, cụ thể là đã có tải khởi tạo và bảng được tạo chính xác trong Hive, thì chúng ta sẽ nhận được lược đồ bảng được cập nhật.

Và vấn đề cuối cùng là bạn không thể chỉ thêm một phân vùng vào bảng Hive, vì nó sẽ bị hỏng. Bạn cần buộc Hive sửa cấu trúc phân vùng của nó:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Nhiệm vụ đơn giản là đọc JSON và tạo mặt tiền cửa hàng dựa trên nó dẫn đến việc vượt qua một số khó khăn tiềm ẩn, các giải pháp mà bạn phải tìm kiếm riêng. Và mặc dù những giải pháp này rất đơn giản, nhưng bạn phải mất rất nhiều thời gian để tìm ra chúng.

Để thực hiện việc xây dựng tủ trưng bày, tôi phải:

  • Thêm phân vùng vào tủ trưng bày, loại bỏ tệp dịch vụ
  • Xử lý các trường trống trong dữ liệu nguồn mà Spark đã nhập
  • Truyền các loại đơn giản thành một chuỗi
  • Chuyển đổi tên trường thành chữ thường
  • Tải lên dữ liệu riêng biệt và đăng ký bảng trong Hive (tạo DDL)
  • Đừng quên thoát tên trường có thể không tương thích với Hive
  • Tìm hiểu cách cập nhật đăng ký bảng trong Hive

Tóm lại, chúng tôi lưu ý rằng quyết định xây dựng cửa sổ cửa hàng có nhiều cạm bẫy. Do đó, trong trường hợp gặp khó khăn trong việc thực hiện, tốt hơn là liên hệ với một đối tác có kinh nghiệm với chuyên môn thành công.

Cảm ơn bạn đã đọc bài viết này, chúng tôi hy vọng bạn tìm thấy thông tin hữu ích.

Nguồn: www.habr.com

Thêm một lời nhận xét