[Spark] Pyspark UDF 함수에 변수를 넘겨주어 실행하는 방법들

NewNewDaddy·2023년 11월 3일
0

SPARK

목록 보기
5/17
post-thumbnail

0. INTRO

  • Pyspark 에서 UDF(User Defined Function)은 Dataframe의 특정 컬럼을 사용자가 원하는 형태로 가공할 수 있는 아주 매력적인 기능이다.
  • 정의하는 방법에는 데코레이터를 사용하거나 함수를 직접 명시해주는 등의 몇가지 방법들이 있는데 이번 글에서는 사용자가 지정한 함수에 특정 변수 List를 넘겨주어 UDF를 적용하는 방법들을 알아볼 것이다.

1. 본문

  • 본문 다뤄볼 함수는 넘겨받은 List 내의 단어들이 특정 컬럼에 존재하면 이를 공백으로 replace 시켜주는 UDF이다.

1. 직접 명시를 통한 UDF 정의

param_list = [.....]

# UDF에 list를 바로 넘겨줌
def processor(val, param = param_list):
    
    for i in param:
        val = val.replace(i, '')
    
    return val

# UDF 직접 명시
udf_function = F.udf(processor, T.StringType())

df.withcolumn('new_col', udf_function(F.col('old_col')))

2. Decorator를 통한 UDF 정의

param_list = [.....]

# 데코레이터를 통하여 UDF 정의해준다.
@F.udf(returnType=T.StringType())
# UDF에 list를 바로 넘겨줌
def processor(val, param = param_list):
    
    for i in param:
        val = val.replace(i, '')
    
    return val

# 데코레이터로 정의해준 함수 이름을 바로 사용
df.withcolumn('new_col', processor(F.col('old_col')))

3. Lambda 함수를 통한 UDF 정의

  • 위의 두 방법은 넘겨줄 list를 UDF 함수의 변수로 바로 넘겨주었기 때문에 함수 작성 전에 list가 먼저 정의되어 있어야했다.
  • 함수를 생성한 이후 나중에 list를 정의하고 UDF로 등록하면서 list를 변수로 같이 넘겨주는 포맷이 훨씬 사용하기 용이한데 이는 Lambda함수를 통해 적용시킬 수 있다.
# UDF 함수를 먼저 생성한다.
def processor(val, param):
    
    for i in param:
        val = val.replace(i, '')
    
    return val

# processor 함수에 변수를 넘겨주며 실행될 수 있도록 함수를 하나 더 정의한다.
def udf_function(params):
    
    return F.udf(lambda a: processor(a, params))

# 넘겨줄 list 정의
param_list = [.....]

# 함수(변수 list)(컬럼) 이런 순서로 명시하여 사용
df.withColumn('new_col', udf_function(param_list)(F.col('old_col')))
profile
데이터 엔지니어의 작업공간 / #PYTHON #CLOUD #SPARK #AWS #GCP #NCLOUD

0개의 댓글