DEV Community

Manoranjan
Manoranjan

Posted on

Transpile a SQL Script into PySpark DataFrame API equivalent code

Input SQL Script (assume any dialect) :

SELECT b.se10,
       b.se3,
       b.se_aggrtr_indctr,
       b.key_swipe_ind
FROM
  (SELECT se10,
          se3,
          se_aggrtr_indctr,
          ROW_NUMBER() OVER (PARTITION BY SE10
                             ORDER BY se_aggrtr_indctr DESC) AS rn,
          key_swipe_ind
   FROM fraud_details_data_whole
   GROUP BY se10,
            se3,
            se_aggrtr_indctr ,
            key_swipe_ind) b
WHERE b.rn<2
Enter fullscreen mode Exit fullscreen mode

Output PySpark Code using DataFrame API :

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Create a SparkSession
spark = SparkSession.builder \
    .appName("TranspileSQLExample") \
    .getOrCreate()

# Sample data (replace with your actual DataFrame)
data = [
    ("se10_value1", "se3_value1", "aggrtr_value1", "swipe_value1"),
    ("se10_value1", "se3_value2", "aggrtr_value2", "swipe_value2"),
    ("se10_value2", "se3_value3", "aggrtr_value3", "swipe_value3"),
    ("se10_value2", "se3_value4", "aggrtr_value4", "swipe_value4"),
    ("se10_value3", "se3_value5", "aggrtr_value5", "swipe_value5"),
    ("se10_value3", "se3_value6", "aggrtr_value6", "swipe_value6")
]

# Create DataFrame
fraud_details_data_whole = spark.createDataFrame(data, ["se10", "se3", "se_aggrtr_indctr", "key_swipe_ind"])

# Define Window specification for row_number() function
windowSpec = Window.partitionBy("se10").orderBy(fraud_details_data_whole["se_aggrtr_indctr"].desc())

# Add row number column
fraud_details_data_whole = fraud_details_data_whole.withColumn("rn", row_number().over(windowSpec))

# Filter rows where rn < 2
result_df = fraud_details_data_whole.filter("rn < 2")

# Select required columns
result_df = result_df.select("se10", "se3", "se_aggrtr_indctr", "key_swipe_ind")

# Show the result DataFrame
result_df.show()
Enter fullscreen mode Exit fullscreen mode

Is there any way to translate the above Sql Query into its equivalent PySpark DataFrame API code? The result must be equal when we execute Sql Script and its transpiled PySpark code separately.
Please suggest

Top comments (0)