pySpark11 - withColumn 컬럼 추가/컬럼 연산

박성현·2024년 6월 10일

pySpark

목록 보기
11/17

솔직히 ... sql이나 python함수나 사용하는데는 비슷하나 ,
python함수를 쓰는것이 리소스 사용면에서 효율적이라 하니, 공부는 해야할듯.


PySpark 함수와 Column 연산을 활용한 복잡한 열 변환 예시
예시 시나리오:

쇼핑몰 거래 데이터에서 다음과 같은 복잡한 열 변환 작업을 수행한다고 가정

  • 할인율 적용:
    상품 가격(price)에 할인율(discount_rate)을 적용하여 할인된 가격(discounted_price) 계산

  • VIP 등급 할인 추가:
    고객 등급(customer_grade)이 VIP인 경우 추가 할인율(5%)을 적용

  • 총 결제 금액 계산:
    할인된 가격에 수량(quantity)을 곱하여 총 결제 금액(total_payment) 계산

  • 결제 수단 변환:
    결제 수단(payment_method) 값을 숫자로 변환 (예: "credit_card" -> 1, "bank_transfer" -> 2)

  • 구매 시간대 추출:
    구매 시간(purchase_time)에서 시간대(hour_period) 추출 (예: "오전", "오후", "저녁")
    코드 예시:

+-----+-------------+-------------+--------+-------------+-------------------+----------------+-------------+-----------------+------------+
|price|discount_rate|customer_grade|quantity|payment_method|     purchase_time|discounted_price|total_payment|payment_method_num|hour_period|
+-----+-------------+-------------+--------+-------------+-------------------+----------------+-------------+-----------------+------------+
|10000|          0.1|          VIP|       2|  credit_card|2023-06-09 10:30:00|          8550.0|      17100.0|                1|        오전|
|25000|          0.05|       NORMAL|       1|bank_transfer|2023-06-09 19:45:00|         23750.0|      23750.0|                2|        저녁|
+-----+-------------+-------------+--------+-------------+-------------------+----------------+-------------+-----------------+------------+
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

# SparkSession 생성 
spark = SparkSession.builder.appName("ComplexColumnTransformation").getOrCreate()

# 예시 데이터 (DataFrame 생성)
data = spark.createDataFrame([
    (10000, 0.1, "VIP", 2, "credit_card", "2023-06-09 10:30:00"),
    (25000, 0.05, "NORMAL", 1, "bank_transfer", "2023-06-09 19:45:00")
], ["price", "discount_rate", "customer_grade", "quantity", "payment_method", "purchase_time"])

sql vs python

# 복잡한 열 변환 작업
transformed_data = (data
    .withColumn("discounted_price", 
                F.when(F.col("customer_grade") == "VIP", 
                       F.col("price") * (1 - F.col("discount_rate") - 0.05))
                .otherwise(F.col("price") * (1 - F.col("discount_rate"))))
    .withColumn("total_payment", F.col("discounted_price") * F.col("quantity"))
    .withColumn("payment_method_num", 
                F.when(F.col("payment_method") == "credit_card", 1)
                .when(F.col("payment_method") == "bank_transfer", 2)
                .otherwise(0))  # 기타 결제 수단 처리
    .withColumn("purchase_hour", F.hour(F.to_timestamp("purchase_time")))
    .withColumn("hour_period", 
                F.when(F.col("purchase_hour") < 12, "오전")
                .when(F.col("purchase_hour") < 18, "오후")
                .otherwise("저녁"))
)

# 결과 출력
transformed_data.show()

withColumn을 사용하여 새로운 열을 추가하거나 기존 열을 변환합니다.
F.when().otherwise()를 사용하여 조건에 따라 다른 값을 할당합니다.
F.col()을 사용하여 다른 열의 값을 참조합니다.
F.hour(), F.to_timestamp() 등 다양한 PySpark 함수를 활용하여 복잡한 계산을 수행합니다.

spark.sql(
    "select * , \
    case when customer_grade == 'VIP' then (price*(1-discount_rate-0.05)) else (price*(1-discount_rate)) end as discounted_price, \
    discounted_price*quantity as total_payment,\
    case when payment_method == 'credit_card' then 1 else 2 end as payment_method_num ,\
    EXTRACT(HOUR from TO_TIMESTAMP(purchase_time)) as purchase_hour , \
    case when EXTRACT(HOUR from TO_TIMESTAMP(purchase_time)) < 12 then '오전' when EXTRACT(HOUR from TO_TIMESTAMP(purchase_time)) < 18 then '오후' else '저녁' end as hour_period \
    from dat"
).show()
profile
다소Good한 데이터 엔지니어

0개의 댓글