RDD map, filter 외부 모듈 함수 사용하기 in pyspark

rupert·2020년 5월 30일
0
post-thumbnail

pyspark에서 RDD의 디테일한 데이터 가공작업시에 map은 많이 사용하는 기능이다.
모르는 이들은, python의 기본 map함수를 사용하는것과 같다고 생각하면 편하다.

list에 일괄적으로 10을 더하고 이를 취하는 예시

rdd = spark.sparkContext.parallelize([1, 4, 9])
 
result = rdd.map(lambda score:score +10).collect()
print(result)
# [11, 14, 19]

또는 lambda를 사용하지 않고, 아래처럼 별도의 함수를 만들어 코드를 간단히 처리할 수도 있다.

rdd = spark.sparkContext.parallelize([1, 4, 9])

def get_updated_score(score):
    return score +10

result = rdd.map(get_updated_score).collect()
print(result)
# [11, 14, 19]

다른 python file의 함수를 map에서 호출

때때로 복잡한 가공이 필요하여 map사용 함수가 길어지게 되는 상황이 발생할 수 있다.
그 역할에 따라 해당 함수를 다른 파일에서 불러오고싶을 수 있다.

rddmodule.py 라는 파일에 get_updated_score() 함수를 옮기고 파일을 적절히 호출한다.

rddmodule.py

def get_updated_score(score):
    return score +10

main.py

from rddmodule import get_updated_score

rdd = spark.sparkContext.parallelize([1, 4, 9])

result = rdd.map(get_updated_score).collect()
print(result)

이때 아래와 같은 로그가 출력되며, 실행에 실패한다.

ModuleNotFoundError: No module named 'rddmodule'

해결방법

python이 해당 모듈을 잘 못읽는건가?

찾아보면, pyspark를 실행시킬때 해당 모듈이 포함되지 않아 발생되는 문제임을 알 수 있다.

요 문제를 쉽게 해결하기 위해, 해당 파일을 먼저 실행시키는 방법이 있다.

exec(open('./rddmodule.py').read())

map을 실행하기 전에 위처럼 해당 파일을 실행하면, 모듈을 읽어오는데 아무 문제가 발생하지 않는다!

from rddmodule import get_updated_score
exec(open('./rddmodule.py').read())

rdd = spark.sparkContext.parallelize([1, 4, 9])
result = rdd.map(get_updated_score).collect()
print(result)
# [11, 14, 19]

그 외 해결방법

해보지는 않았지만, SparkContext의 addPyFile()을 사용하여 모듈을 추가할 수 있다고 한다.

관련 질문: apache spark - pyspark addPyFile to add zip of .py files, but module still not found - Stack Overflow

썸네일 출처

Photo by Sajjad Ahmadi on Unsplash

profile
hi there

0개의 댓글