솔직히 ... sql이나 python함수나 사용하는데는 비슷하나 ,
python함수를 쓰는것이 리소스 사용면에서 효율적이라 하니, 공부는 해야할듯.
PySpark 함수와 Column 연산을 활용한 복잡한 열 변환 예시
예시 시나리오:
쇼핑몰 거래 데이터에서 다음과 같은 복잡한 열 변환 작업을 수행한다고 가정
할인율 적용:
상품 가격(price)에 할인율(discount_rate)을 적용하여 할인된 가격(discounted_price) 계산
VIP 등급 할인 추가:
고객 등급(customer_grade)이 VIP인 경우 추가 할인율(5%)을 적용
총 결제 금액 계산:
할인된 가격에 수량(quantity)을 곱하여 총 결제 금액(total_payment) 계산
결제 수단 변환:
결제 수단(payment_method) 값을 숫자로 변환 (예: "credit_card" -> 1, "bank_transfer" -> 2)
구매 시간대 추출:
구매 시간(purchase_time)에서 시간대(hour_period) 추출 (예: "오전", "오후", "저녁")
코드 예시:
+-----+-------------+-------------+--------+-------------+-------------------+----------------+-------------+-----------------+------------+
|price|discount_rate|customer_grade|quantity|payment_method| purchase_time|discounted_price|total_payment|payment_method_num|hour_period|
+-----+-------------+-------------+--------+-------------+-------------------+----------------+-------------+-----------------+------------+
|10000| 0.1| VIP| 2| credit_card|2023-06-09 10:30:00| 8550.0| 17100.0| 1| 오전|
|25000| 0.05| NORMAL| 1|bank_transfer|2023-06-09 19:45:00| 23750.0| 23750.0| 2| 저녁|
+-----+-------------+-------------+--------+-------------+-------------------+----------------+-------------+-----------------+------------+
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
# SparkSession 생성
spark = SparkSession.builder.appName("ComplexColumnTransformation").getOrCreate()
# 예시 데이터 (DataFrame 생성)
data = spark.createDataFrame([
(10000, 0.1, "VIP", 2, "credit_card", "2023-06-09 10:30:00"),
(25000, 0.05, "NORMAL", 1, "bank_transfer", "2023-06-09 19:45:00")
], ["price", "discount_rate", "customer_grade", "quantity", "payment_method", "purchase_time"])
# 복잡한 열 변환 작업
transformed_data = (data
.withColumn("discounted_price",
F.when(F.col("customer_grade") == "VIP",
F.col("price") * (1 - F.col("discount_rate") - 0.05))
.otherwise(F.col("price") * (1 - F.col("discount_rate"))))
.withColumn("total_payment", F.col("discounted_price") * F.col("quantity"))
.withColumn("payment_method_num",
F.when(F.col("payment_method") == "credit_card", 1)
.when(F.col("payment_method") == "bank_transfer", 2)
.otherwise(0)) # 기타 결제 수단 처리
.withColumn("purchase_hour", F.hour(F.to_timestamp("purchase_time")))
.withColumn("hour_period",
F.when(F.col("purchase_hour") < 12, "오전")
.when(F.col("purchase_hour") < 18, "오후")
.otherwise("저녁"))
)
# 결과 출력
transformed_data.show()
withColumn을 사용하여 새로운 열을 추가하거나 기존 열을 변환합니다.
F.when().otherwise()를 사용하여 조건에 따라 다른 값을 할당합니다.
F.col()을 사용하여 다른 열의 값을 참조합니다.
F.hour(), F.to_timestamp() 등 다양한 PySpark 함수를 활용하여 복잡한 계산을 수행합니다.
spark.sql(
"select * , \
case when customer_grade == 'VIP' then (price*(1-discount_rate-0.05)) else (price*(1-discount_rate)) end as discounted_price, \
discounted_price*quantity as total_payment,\
case when payment_method == 'credit_card' then 1 else 2 end as payment_method_num ,\
EXTRACT(HOUR from TO_TIMESTAMP(purchase_time)) as purchase_hour , \
case when EXTRACT(HOUR from TO_TIMESTAMP(purchase_time)) < 12 then '오전' when EXTRACT(HOUR from TO_TIMESTAMP(purchase_time)) < 18 then '오후' else '저녁' end as hour_period \
from dat"
).show()