DEV Community

Chatchai Komrangded (Bas) for AWS Community ASEAN

Posted on • Updated on

เริ่มใช้งาน SQL-based INSERTS, DELETES and UPSERTS in S3 โดยใช้ AWS Glue 3.0 และ Delta Lake

โพสต์นี้เราจะมาดูวิธีการใช้ Spark SQL Engines โดยใช้ AWS Glue ในการทำ UPSERTS, DELETES, และ INSERTS

AWS Glue คืออะไร?

ในเดือนนี้ AWS ออก Glue เวอร์ชั่น 3.0! AWS Glue 3.0 สิ่งที่เพิ่มมาคือ การทำให้ Performance Apache Spark 3.1 สำหรับงานที่เป็นการประมวลผลแบบแบตช์และสตรีม เพิ่มความเร็วทั้งในส่วนการทำ ingestion, processing, และ integration​! ลองมาดู benchmark กัน

BDB-1727-image001.jpg

BDB-1727-image005_resized2.jpg

ได้ Thoughput เพิ่มขึ้นหลายเท่า!

Table Of Contents

Architecture Diagram

delta-lake

Delta lake คืออะไร
Storage layer open source project ที่อำนวยความสะดวกให้เราสามารถสร้าง Lakehouse architecture บน data lake โดยคุณสมบัติประกอบไปด้วย รองรับ ACID transactions (คล้าย RDBMS), การจัดการ metadata, และสามารถสร้าง streaming, หรือ batch processing บนพวก data lakes storage ต่างๆได้เช่น Amazon S3

รายละเอียดเพิ่มเติมที่นี่
https://docs.delta.io/latest/delta-intro.html

ทฏษดีพอละ! ลอง Build เล่นกันดู! เราจะสร้าง flow ง่ายๆคือจะเอาข้อมูลใน csv file มาแล้ว upload ไปที่ S3 bucket แล้วจะ process โดยใช้ AWS Glue! หลังจากนั้นจะ Query ข้อมูลผ่าน Amazon Athena

Pre-requisites

  • โหลด the Delta Lake package ลิ้งนี้ - หา file ในตารางที่ลงท้ายด้วย .jar
  • AWS Account - ❗ Glue ETL ไม่อยู่ใน free tier นะจ๊ะ..
  • โหลดข้อมูลก่อนเลย! ลิ้งนี้
  • Codes สามารถอ้างจาก Github เจ้าของเดิมได้ ที่นี่

Format to Delta Table

อันดับแรกเราจะ convert dataset ของเราก่อน

  • สร้าง Spark job จาก Glue ลิ้งนี้
  • เลือก Spark script editor
  • เลือก Create a new script with boilerplate code
# Import the packages
from delta import *
from pyspark.sql.session import SparkSession

# Initialize Spark Session along with configs for Delta Lake
spark = SparkSession \
    .builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Read Source
inputDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/raw/')

# Write data as a DELTA TABLE
inputDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/current/")

# Read Source
updatesDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/updates/')

# Write data as a DELTA TABLE
updatesDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/updates_delta/")

# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
Enter fullscreen mode Exit fullscreen mode

หลักๆคือการ converts ตัว data เป็น delta format ซึ่งทำ 2 datasets ที่เป็นข้อมูลตั้งต้นกับข้อมูลที่จะใช้ updates. หลังจาก convert เราจะมีการสร้าง SYMLINK MANIFEST file ซึ่งเราจะใช้ในการดูผ่าน Athena

ลองสร้าง Table ใน Athena บน Delta Lake storage

CREATE EXTERNAL TABLE IF NOT EXISTS superstore ( 
    row_id STRING,
    order_id STRING,
    order_date STRING,
    ship_date STRING,
    ship_mode STRING,
    customer_id STRING,
    customer_name STRING,
    segment STRING,
    country STRING,
    city STRING,
    state STRING,
    postal_code STRING,
    region STRING,
    product_id STRING,
    category STRING,
    sub_category STRING,
    product_name STRING,
    sales STRING,
    quantity STRING,
    discount STRING,
    profit STRING
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
LOCATION 's3://delta-lake-aws-glue-demo/current/_symlink_format_manifest/'
Enter fullscreen mode Exit fullscreen mode

ลอง Query ผ่าน Athena

select * from superstore
Enter fullscreen mode Exit fullscreen mode

athena

Upsert

Upsert คือกระบวนการในการ inserts rows เข้าไปใน db table ถ้าข้อมูลนั้นยังไม่มี หรือ update ข้อมูลถ้าข้อมูลนั้นมีแล้ว ในตัวอย่างนี้เราจะ update ค่าบาง rows ของ ship_mode, customer_name, sales, and profit columns

  • สร้าง Spark job จาก Glue ลิ้งนี้
  • เลือก Spark script editor
  • เลือก Create a new script with boilerplate code
# Import as always
from delta import *
from pyspark.sql.session import SparkSession

# Initialize Spark Session along with configs for Delta Lake
spark = SparkSession \
    .builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


updateDF = spark.sql("""

MERGE INTO delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore
USING delta.`s3a://delta-lake-aws-glue-demo/updates_delta/` as updates
ON superstore.row_id = updates.row_id
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *
""")

# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
Enter fullscreen mode Exit fullscreen mode

คำสั่งด้านบนนั้นเราะทำการ updates ข้อมูลบน table หลัก (superstore) ถ้ามีการพบข้อมูลใน updates table (updates) จากการ matching ค่าใน row_id ถ้า row_id ตรงกันจะทำการ UPDATE ALL. ถ้าไม่มีจะทำการ INSERT ALL

สามารถดูรายละเอียดได้ ที่นี่

Delete

ลองทำการลบข้อมูลกันมั่ง! Delete จะค่อนข้างตรงไปตรงมา..

  • สร้าง Spark job จาก Glue ลิ้งนี้
  • เลือก Spark script editor
  • เลือก Create a new script with boilerplate code
from delta import *
from pyspark.sql.session import SparkSession


spark = SparkSession \
    .builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


deleteDF = spark.sql("""
DELETE 
FROM delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore 
WHERE CAST(superstore.row_id as integer) <= 20
""")

# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(
    spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
Enter fullscreen mode Exit fullscreen mode

หลักๆ code ด้านบนคือลบข้อมูลโดย row_id

ลอง Query ผ่าน Athena

SELECT * 
FROM "default"."superstore" 
-- Need to CAST hehe bec it is currently a STRING
ORDER BY CAST(row_id as integer); 
Enter fullscreen mode Exit fullscreen mode

Athena_output

เราจะเห็น records หายไปละ Data Lake กลายเป็น Delta จริงๆ!

Insert

คล้ายๆ Deletes, Inserts ค่อนข้างตรงไปตรงมา มาลองดู

  • สร้าง Spark job จาก Glue ลิ้งนี้
  • เลือก Spark script editor
  • เลือก Create a new script with boilerplate code
from delta import *
from pyspark.sql.session import SparkSession


spark = SparkSession \
    .builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


insertDF = spark.sql("""
INSERT INTO delta.`s3a://delta-lake-aws-glue-demo/current/`
SELECT *
FROM delta.`s3a://delta-lake-aws-glue-demo/updates_delta/`
WHERE CAST(row_id as integer) <= 20
""")

# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(
    spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")
Enter fullscreen mode Exit fullscreen mode

Partitioned Data

ทำไปหมดเกือบทุกกระบวนท่า แต่นี่เป็นแค่ dataset ง่ายๆ ซึ่งชีวิตจริงเราต้องคำนึงถึงการทำ parition ด้วยนะ! ทำไมต้องทำหละ คิดดูข้อมูลเราใหญ่นะ เราต้องหั่นไปชิ้นๆเพื่อเลือกดึงเฉพาะข้อมูลที่ใช้สิ, การทำ parition นี่จะช่วยเรา 2 เรื่องใหญ่ๆเลยคือ Performance กับ Cost กับบาง services ที่ใช้ในการ Scan data อย่าง Athena

เรามีการแก้ไขบางอย่างใน Spark code ในการ write data เป็น parition ทำให้โครงสร้างการเก็บข้อมูลใน S3 เป็นแบบนี้

s3_structure

แนวคิดของ Delta Lake ขึ้นกับ log history โดยตัว Delta lake จะมีการสร้าง delta logs ทุกๆการ committed transactions

โดย Delta logs นั้นจะมี delta files ซึ่งเก็บเป็น JSON ซึ่งในนั้นเก็บข้อมูลของการทำที่เกิดขึ้นและรายละเอียดของ snapshpt file ล่าสุด และมีการเก็บข้อมูลเกี่ยวกับ statistics ของข้อมูลเช่นกัน

เช่นตัวอย่างข้างล่าง

raw date_part=2014-08-27/

date_part=2014-08-27

current date_part=2014-08-27/ - DELETED ROWS

date_part=2014-08-27_deleted

ถ้าเราลองเปิด parquet files ดู
open_parquet

เราจะเห็นว่า code เราทำให้เกิดการเขียน parquet file ใหม่ระหว่างการ delete โดย row_id ตั้งแต่ 21 ขึ้นไปไม่ได้ถูกลบตามเงื่อนไข หลังจากนั้น JSON file ก็จะทำการ map เข้ากับ file ที่เพิ่งถูกสร้างขึ้นมาใหม่ในการทำ versioning

เราสามารถสร้าง Table ที่มีการทำ partition แล้วใน Athena ตามคำสั่งด้านล่าง

CREATE EXTERNAL TABLE IF NOT EXISTS superstore ( 
    row_id STRING,
    order_id STRING,
    order_date STRING,
    ship_date STRING,
    ship_mode STRING,
    customer_id STRING,
    customer_name STRING,
    segment STRING,
    country STRING,
    city STRING,
    state STRING,
    postal_code STRING,
    region STRING,
    product_id STRING,
    category STRING,
    sub_category STRING,
    product_name STRING,
    sales STRING,
    quantity STRING,
    discount STRING,
    profit STRING,
    date_part STRING

)
-- Add PARTITIONED BY option
PARTITIONED BY (date_part STRING)

ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
LOCATION 's3://delta-lake-aws-glue-demo/current/_symlink_format_manifest/'
Enter fullscreen mode Exit fullscreen mode

หลังจากนั้นให้รันคำสั่งด้านล่างเพื่อทำการ update metadata ใน catalog เพื่อให้เราสามารถ query ข้อมูลใน paritions ได้

MSCK REPAIR

to add the partitions.

✅ Conclusion

จบแล้ว!
good_stuff

Discussion (0)