- Credit Original English Version written by Kyle Data Science Specialist at Info Alchemy, It's great work!
โพสต์นี้เราจะมาดูวิธีการใช้ Spark SQL Engines โดยใช้ AWS Glue ในการทำ UPSERTS, DELETES, และ INSERTS
ในเดือนนี้ AWS ออก Glue เวอร์ชั่น 3.0! AWS Glue 3.0 สิ่งที่เพิ่มมาคือ การทำให้ Performance Apache Spark 3.1 สำหรับงานที่เป็นการประมวลผลแบบแบตช์และสตรีม เพิ่มความเร็วทั้งในส่วนการทำ ingestion, processing, และ integration! ลองมาดู benchmark กัน
ได้ Thoughput เพิ่มขึ้นหลายเท่า!
Table Of Contents
- Architecture Diagram
- Pre-requisites
- Format to Delta
- Upsert
- Delete
- Insert
- Partitioned Data
- Conclusion
Architecture Diagram
Delta lake คืออะไร
Storage layer open source project ที่อำนวยความสะดวกให้เราสามารถสร้าง Lakehouse architecture บน data lake โดยคุณสมบัติประกอบไปด้วย รองรับ ACID transactions (คล้าย RDBMS), การจัดการ metadata, และสามารถสร้าง streaming, หรือ batch processing บนพวก data lakes storage ต่างๆได้เช่น Amazon S3
รายละเอียดเพิ่มเติมที่นี่
https://docs.delta.io/latest/delta-intro.html
ทฏษดีพอละ! ลอง Build เล่นกันดู! เราจะสร้าง flow ง่ายๆคือจะเอาข้อมูลใน csv file มาแล้ว upload ไปที่ S3 bucket แล้วจะ process โดยใช้ AWS Glue! หลังจากนั้นจะ Query ข้อมูลผ่าน Amazon Athena
Pre-requisites
- โหลด the Delta Lake package ลิ้งนี้ - หา file ในตารางที่ลงท้ายด้วย .jar
- AWS Account - ❗ Glue ETL ไม่อยู่ใน free tier นะจ๊ะ..
- โหลดข้อมูลก่อนเลย! ลิ้งนี้
- Codes สามารถอ้างจาก Github เจ้าของเดิมได้ ที่นี่
Format to Delta Table
อันดับแรกเราจะ convert dataset ของเราก่อน
- สร้าง Spark job จาก Glue ลิ้งนี้
- เลือก Spark script editor
- เลือก Create a new script with boilerplate code
# Import the packages
from delta import *
from pyspark.sql.session import SparkSession
# Initialize Spark Session along with configs for Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read Source
inputDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/raw/')
# Write data as a DELTA TABLE
inputDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/current/")
# Read Source
updatesDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/updates/')
# Write data as a DELTA TABLE
updatesDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/updates_delta/")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
หลักๆคือการ converts ตัว data เป็น delta format ซึ่งทำ 2 datasets ที่เป็นข้อมูลตั้งต้นกับข้อมูลที่จะใช้ updates. หลังจาก convert เราจะมีการสร้าง SYMLINK MANIFEST file ซึ่งเราจะใช้ในการดูผ่าน Athena
ลองสร้าง Table ใน Athena บน Delta Lake storage
CREATE EXTERNAL TABLE IF NOT EXISTS superstore (
row_id STRING,
order_id STRING,
order_date STRING,
ship_date STRING,
ship_mode STRING,
customer_id STRING,
customer_name STRING,
segment STRING,
country STRING,
city STRING,
state STRING,
postal_code STRING,
region STRING,
product_id STRING,
category STRING,
sub_category STRING,
product_name STRING,
sales STRING,
quantity STRING,
discount STRING,
profit STRING
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://delta-lake-aws-glue-demo/current/_symlink_format_manifest/'
ลอง Query ผ่าน Athena
select * from superstore
Upsert
Upsert คือกระบวนการในการ inserts rows เข้าไปใน db table ถ้าข้อมูลนั้นยังไม่มี หรือ update ข้อมูลถ้าข้อมูลนั้นมีแล้ว ในตัวอย่างนี้เราจะ update ค่าบาง rows ของ ship_mode, customer_name, sales, and profit columns
- สร้าง Spark job จาก Glue ลิ้งนี้
- เลือก Spark script editor
- เลือก Create a new script with boilerplate code
# Import as always
from delta import *
from pyspark.sql.session import SparkSession
# Initialize Spark Session along with configs for Delta Lake
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
updateDF = spark.sql("""
MERGE INTO delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore
USING delta.`s3a://delta-lake-aws-glue-demo/updates_delta/` as updates
ON superstore.row_id = updates.row_id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
""")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
คำสั่งด้านบนนั้นเราะทำการ updates ข้อมูลบน table หลัก (superstore) ถ้ามีการพบข้อมูลใน updates table (updates) จากการ matching ค่าใน row_id ถ้า row_id ตรงกันจะทำการ UPDATE ALL. ถ้าไม่มีจะทำการ INSERT ALL
สามารถดูรายละเอียดได้ ที่นี่
Delete
ลองทำการลบข้อมูลกันมั่ง! Delete จะค่อนข้างตรงไปตรงมา..
- สร้าง Spark job จาก Glue ลิ้งนี้
- เลือก Spark script editor
- เลือก Create a new script with boilerplate code
from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
deleteDF = spark.sql("""
DELETE
FROM delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore
WHERE CAST(superstore.row_id as integer) <= 20
""")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(
spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
หลักๆ code ด้านบนคือลบข้อมูลโดย row_id
ลอง Query ผ่าน Athena
SELECT *
FROM "default"."superstore"
-- Need to CAST hehe bec it is currently a STRING
ORDER BY CAST(row_id as integer);
เราจะเห็น records หายไปละ Data Lake กลายเป็น Delta จริงๆ!
Insert
คล้ายๆ Deletes, Inserts ค่อนข้างตรงไปตรงมา มาลองดู
- สร้าง Spark job จาก Glue ลิ้งนี้
- เลือก Spark script editor
- เลือก Create a new script with boilerplate code
from delta import *
from pyspark.sql.session import SparkSession
spark = SparkSession \
.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
insertDF = spark.sql("""
INSERT INTO delta.`s3a://delta-lake-aws-glue-demo/current/`
SELECT *
FROM delta.`s3a://delta-lake-aws-glue-demo/updates_delta/`
WHERE CAST(row_id as integer) <= 20
""")
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(
spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
Partitioned Data
ทำไปหมดเกือบทุกกระบวนท่า แต่นี่เป็นแค่ dataset ง่ายๆ ซึ่งชีวิตจริงเราต้องคำนึงถึงการทำ parition ด้วยนะ! ทำไมต้องทำหละ คิดดูข้อมูลเราใหญ่นะ เราต้องหั่นไปชิ้นๆเพื่อเลือกดึงเฉพาะข้อมูลที่ใช้สิ, การทำ parition นี่จะช่วยเรา 2 เรื่องใหญ่ๆเลยคือ Performance กับ Cost กับบาง services ที่ใช้ในการ Scan data อย่าง Athena
เรามีการแก้ไขบางอย่างใน Spark code ในการ write data เป็น parition ทำให้โครงสร้างการเก็บข้อมูลใน S3 เป็นแบบนี้
แนวคิดของ Delta Lake ขึ้นกับ log history โดยตัว Delta lake จะมีการสร้าง delta logs ทุกๆการ committed transactions
โดย Delta logs นั้นจะมี delta files ซึ่งเก็บเป็น JSON ซึ่งในนั้นเก็บข้อมูลของการทำที่เกิดขึ้นและรายละเอียดของ snapshpt file ล่าสุด และมีการเก็บข้อมูลเกี่ยวกับ statistics ของข้อมูลเช่นกัน
- จาก Data Floq
เช่นตัวอย่างข้างล่าง
raw date_part=2014-08-27/
current date_part=2014-08-27/ - DELETED ROWS
ถ้าเราลองเปิด parquet files ดู
เราจะเห็นว่า code เราทำให้เกิดการเขียน parquet file ใหม่ระหว่างการ delete โดย row_id ตั้งแต่ 21 ขึ้นไปไม่ได้ถูกลบตามเงื่อนไข หลังจากนั้น JSON file ก็จะทำการ map เข้ากับ file ที่เพิ่งถูกสร้างขึ้นมาใหม่ในการทำ versioning
เราสามารถสร้าง Table ที่มีการทำ partition แล้วใน Athena ตามคำสั่งด้านล่าง
CREATE EXTERNAL TABLE IF NOT EXISTS superstore (
row_id STRING,
order_id STRING,
order_date STRING,
ship_date STRING,
ship_mode STRING,
customer_id STRING,
customer_name STRING,
segment STRING,
country STRING,
city STRING,
state STRING,
postal_code STRING,
region STRING,
product_id STRING,
category STRING,
sub_category STRING,
product_name STRING,
sales STRING,
quantity STRING,
discount STRING,
profit STRING,
date_part STRING
)
-- Add PARTITIONED BY option
PARTITIONED BY (date_part STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://delta-lake-aws-glue-demo/current/_symlink_format_manifest/'
หลังจากนั้นให้รันคำสั่งด้านล่างเพื่อทำการ update metadata ใน catalog เพื่อให้เราสามารถ query ข้อมูลใน paritions ได้
MSCK REPAIR
Top comments (0)