from pyspark.sql import functions as F
from pyspark.sql import Window as W
(
df
.withColumn(
"CategoryName_cnt",
F.size(
F.collect_set("CategoryName").over(W.partitionBy("Category"))
)
)
.filter(F.col("CategoryName_cnt") >= 2)
).show()
(
sample_df
.withColumn("id_set",
F.collect_set("id").over(W.partitionBy("category"))
)
.withColumn("id_list",
F.collect_list("id").over(W.partitionBy("category"))
)
).show()
+---+--------+---------+---------+
| id|category| id_set| id_list|
+---+--------+---------+---------+
| 1| b|[1, 2, 3]|[1, 2, 3]|
| 2| b|[1, 2, 3]|[1, 2, 3]|
| 3| b|[1, 2, 3]|[1, 2, 3]|
| 1| a| [1, 2]|[1, 1, 2]|
| 1| a| [1, 2]|[1, 1, 2]|
| 2| a| [1, 2]|[1, 1, 2]|
+---+--------+---------+---------+
df = (
df
.withColumn(
"CategoryName", # 원래 있는 컬럼과 동일한 이름으로 withColumn을 할 경우, 기존 컬럼에 덮어쓰기가 됩니다.
F.first(
F.col("CategoryName"), ignorenulls=True # null인 경우를 제외하고 가장 첫번째
).over(W.partitionBy(F.col("Category")).orderBy(F.col("Date").desc()))
# 다른경우가 존재하기 때문에 가장 최근 날짜 기준 이름으로 덮어쓰기
)
)
df = df.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy'))
to_date
시간 형태를 위한 또 다른 매서드