pyspark에서 RDD의 디테일한 데이터 가공작업시에 map은 많이 사용하는 기능이다.
모르는 이들은, python의 기본 map함수를 사용하는것과 같다고 생각하면 편하다.
rdd = spark.sparkContext.parallelize([1, 4, 9])
result = rdd.map(lambda score:score +10).collect()
print(result)
# [11, 14, 19]
또는 lambda를 사용하지 않고, 아래처럼 별도의 함수를 만들어 코드를 간단히 처리할 수도 있다.
rdd = spark.sparkContext.parallelize([1, 4, 9])
def get_updated_score(score):
return score +10
result = rdd.map(get_updated_score).collect()
print(result)
# [11, 14, 19]
때때로 복잡한 가공이 필요하여 map사용 함수가 길어지게 되는 상황이 발생할 수 있다.
그 역할에 따라 해당 함수를 다른 파일에서 불러오고싶을 수 있다.
rddmodule.py
라는 파일에 get_updated_score()
함수를 옮기고 파일을 적절히 호출한다.
def get_updated_score(score):
return score +10
from rddmodule import get_updated_score
rdd = spark.sparkContext.parallelize([1, 4, 9])
result = rdd.map(get_updated_score).collect()
print(result)
이때 아래와 같은 로그가 출력되며, 실행에 실패한다.
ModuleNotFoundError: No module named 'rddmodule'
python이 해당 모듈을 잘 못읽는건가?
찾아보면, pyspark를 실행시킬때 해당 모듈이 포함되지 않아 발생되는 문제임을 알 수 있다.
요 문제를 쉽게 해결하기 위해, 해당 파일을 먼저 실행시키는 방법이 있다.
exec(open('./rddmodule.py').read())
map을 실행하기 전에 위처럼 해당 파일을 실행하면, 모듈을 읽어오는데 아무 문제가 발생하지 않는다!
from rddmodule import get_updated_score
exec(open('./rddmodule.py').read())
rdd = spark.sparkContext.parallelize([1, 4, 9])
result = rdd.map(get_updated_score).collect()
print(result)
# [11, 14, 19]
해보지는 않았지만, SparkContext의 addPyFile()
을 사용하여 모듈을 추가할 수 있다고 한다.
Photo by Sajjad Ahmadi on Unsplash