[spark 3] 2-2. UDF(User Defined Function)

data_hamster·2023년 8월 2일
0

새로운 함수를 만들 때 사용.
UDF를 만들었다고 치면
withColumn 함수와 함께 사용.

스택오버플로우 개발자 설문조사 실습 때 사용해봤었다.
withColumn은 데이터프레임에서만 사용 가능

SQL에서 쓰고 싶으면, 함수 등록방법이 약간 다름.

UDF엔 두종류
하나는 Transformation 해주는거
하나는 UDAF
예를들어 groupby, sum, min 같은 집계함수. 내가 새로운 집계함수를 만들수 있음.
불행히도 pyspark에선 아직 지원하지 않고 있음.
꼭 만들고 싶다면 Scalar/java 써야함.

2개의 UDF를 만들어보고,
데이터프레임, SQL에서 사용

udf - 데이터 프레임


주어진 문자열을 대문자로 만들어주는 UDF를 만들어본다.

upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))

여기서 F.udf(lambda z: z.upper())는 사용자 정의 함수 (User-Defined Function, UDF)를 정의하고 있습니다. 이 함수는 입력값 z에 대해 z.upper()를 수행하는 함수입니다. 즉, 이 UDF는 문자열을 대문자로 변환하는 기능을 수행합니다.
그리고 df.withColumn("Curated Name", upperUDF("Name"))에서 "Name"은 DataFrame의 컬럼 이름을 나타냅니다. 따라서 upperUDF("Name")는 "Name" 컬럼의 모든 값에 대해 upperUDF 함수를 적용하라는 의미입니다. 그 결과는 "Curated Name"이라는 새로운 컬럼에 저장됩니다.

utf함수가 별도로 이미 세팅되어있음.
람다함수를 등록하고
그 함수의 오브젝트를 받아와서
withColumn를 써가지고, 새로운 필드 이름을 지정하고, 그 필드의 값을 두번째 인자에 지정해주는데, 앞서 등록한 udf 객체를 쓰고, 파라미터로 Name을 썼는데. 이는 데이터 프레임의 컬럼임.

이미 데이터프레임엔 Name이라는 필드가 있음. 이에 람다함수가 호출되어, 필드 레코드들이 들어가서 람다함수의 대문자화해서 리턴됨.

udf - spark SQL

업로드중..

def upper(s):
	return s.upper()

# 먼저 테스트
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()

# 데이터 프레임 기반 SQL에 적용
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()

앞에선 람다였지만 이번엔 파이썬 함수를 등록.
upper 함수를 파이썬 형태로 만들었음.
그다음. udf.register로 문자열로 된 이름을 주었음. 이 문자열이 스파크 sql에서 사용될 것임. 이를 upperUDF 오브젝트로 받았음. 이렇게 받은 값을 앞의 데이터프레임의 withColumn때 넘겨줘도 아무 문제 없음.

아무런 테이블 없이 결과만 어떻게 나오는지 본다 upper('aBcD')하고 .show()를 해줘야 출력이 됨.
ABCD가 리턴될 것임.

두번째 예는 데이터프레임 적용때와 동일함.
df라 가정하고 test라는 테이블 이름을 주었음.
spark.sql 메소드를 부르면서 name 필드를 select하고 그 name을 인자로 앞에 등록했던 upper라는 udf에 넘겨줌.
결과는 동일함.

업로드중..

data = [
{"a":1, "b":2},
{"a":5, "b":5}
]

df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y: x + y)("a","b"))

인풋이 되는 데이터프레임을 구성. 필드 a, b.
새로운 컬럼 c를 만들 것임.

리스트를 spark.createDataFrame으로 만듦.
df라는 이름의 스파크 데이터프레임으로 받음.
withColumn 메소드를 호출하여, 새로운 컬럼 c를 지정. 바로 F.udf함수를 호출하면서 x+y 를 리턴해주는 람다함수를 사용함. 그 전에는 별개의 라인에 udf에 등록을 해줬거나, 객체를 만들어서 넣어줬었음. 그러면서 a와 b 컬럼을 지정해줌 a 컬럼 값이 x, b 컬럼의 값이 y가 됨. -> 새로 배운 내용.

새로만들어진 데이터 프레임이 됨.

스파크 SQL에서 적용
업로드중..

def plus(x,y):
	return x + y
    
plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1,2)").show()

df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) c FROM test").show()

뭔가 sql이 훨씬 간편해 보임.
이름을 plus로 등록을 함.
spark.sql에서 호출은 plus로 하고 인자 2개를 넘겨줌.
저렇게 select로 테스트해서.show()로 보여주게할 수 있음.

앞에서 봤던 데이터 파이썬 리스트를 df로 만든걸 그대로 가져다 씀. test라는 테이블로 지정을 하고, 그 테이블에서 a, b 찍어보고 plus(a,b)로 만들어서 그 결과를 c라고 한다.

간단하게 udf를 사용해봤다

업로드중..
앞에서 만들어본 udf는 모두 스칼라 함수 집계는 아님.
가장 추천되는 방식인 판다스 udf 방식으로 udf를 만들어본다.
먼저 스칼라 그 후 집계를 만들어본다.

.str은 pandas Series의 메소드로, 문자열(string)에 대한 연산을 가능하게 하는 접근자(accessor)입니다. 이를 통해 문자열 관련 함수를 Series의 각 요소에 적용할 수 있습니다.

판다스의 Series는 1차원 배열과 같은 데이터 구조입니다. 이는 파이썬의 리스트, 딕셔너리 또는 다른 시퀀스 형태의 데이터에서 생성될 수 있습니다. Series는 다른 리스트와는 다르게 각 요소에 인덱스가 부여되어 있어서, 배열과 딕셔너리의 장점을 동시에 가지고 있습니다.
StringType()은 upper_udf2 함수의 반환 값의 데이터 유형을 명시하는 것입니다. 이 경우, StringType()이 명시되어 있기 때문에, 이 함수는 문자열 타입의 판다스 Series를 반환해야 합니다. 이렇게 명시하는 이유는 Spark가 이 함수를 호출할 때 반환 값을 올바르게 해석할 수 있도록 해줍니다.
판다스 UDF는 다양한 데이터 타입을 반환할 수 있습니다. 예를 들어, 만약 함수가 정수 타입의 판다스 Series를 반환한다면 IntegerType()을 사용할 수 있고, 실수 타입의 판다스 Series를 반환한다면 FloatType()을 사용할 수 있습니다.

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
	return s.str.upper()
    
upperUDF = spark.udf.register("upper_udf", upper_udf2)

df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) 'Curated Name' FROM test""").show()

먼저 pyspark에서 판다스 udf 모듈 임포트.
어노테이션해서 판다스 udf로 변환할 것임.

레코드 하나씩 입력받는 구조가 아니라, 레코드 집합을 입력받아 프로세싱할 것임.
판다스의 시리즈 타입으로 들어오게 되어 있음. 그렇기 때문에 판다스도 pd로 임포트함.
함수는 upper_udf2임. 어떤 한 레코드의 값이 아니라, 레코드의 집합이 들어옴. 그거를 처리를 해서 판다스 시리즈로 리턴을 함. 특정 컬럼의 타입은 스트링.

앞에서 어노테이션을 하는데 판다스 udf로 사용될꺼라고 언급.

앞의 람다함수, 일반함수보다, 퍼포먼스가 더 좋음. 하나씩 처리하는게 아니라 벌크로 처리함. 아파치 에로우? 기술을 사용함. 파이썬, JVM 오브젝트간 변환이 빠름.

만일 집계함수를 쓰면, 컬럼의 집합 -> 집합 리턴이 아닌, 집합을 받아 특정 값을 리턴함.

스칼라랑 비교했을 때 출력이 값이 하나가 되는 경우 있음.

함수를 만들었으면 이를 udf.register에 등록해줘야 함.

데이터 프레임에서 쓸 때는 저 레퍼런스를 쓰면 됨.
스파크 SQL에서 써볼 때는 정의된 함수를 써도 되고 인스턴스를 써도 되고.

공백이 들어가면 '' 붙여줘야함.

판다스 udf로 집계함수 만들기

업로드중..
평균값을 계산.
입력은 집합, 출력은 float 하나.

@pandas_udf(FloatType())
def average(v:pd.Series) -> float:
	return v.mean()
    
averageUDF = spark.udf.register('average', average)

spark.sql('SELECT average(b) FROM test').show()
df.agg(averageUDF("b").alias("count")).show()

판다스 시리즈에 보면 평균값을 계산해주는 .mean이라는 함수가 있기에 그대로 사용.
그 레퍼런스를 받아. 데이터 프레임을 사용할 때 그대로 써주면 됨.
.alias로 컬럼 이름 바꿀 수 있음.

스파크 SQL쓸 때 등록한 함수 이름 쓰고 보여주게 하면 됨.

업로드중..
다양한 UDF를 실제 코드로 써본다.
개발자 설문조사 실습했던거, 많이 써본 언어, 쓰고싶은 언어. 리스트로 있었는데 이걸 explode 했었은데, order e데이터로 해본다.
이 필드는 리스트로 되어 있음.
이 리스트들을 각각 레코드들로 쪼개본다.
UDF를 간략하게 하나 써본다.

-> 저번에 에어플로우 프로젝트 때 저렇게 JSON을 쪼갰던 기억이 난다.

profile
반갑습니다 햄스터 좋아합니다

0개의 댓글