Pyspark 최대 온도 찾기

김형수·2023년 10월 13일
0
# sample data set
...
ITE00100554,18000101,TMAX,-75,,,E,
ITE00100554,18000101,TMIN,-148,,,E,
GM000010962,18000101,PRCP,0,,,E,
...
from pyspark import SparkConf, SparkContext

# spark setting
conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
sc = SparkContext(conf = conf)


# 텍스트 파일로 읽어들인 RDD를 가공하기 쉽게 (key, value1, value2)로 가공하는 함수
def parseLine(line):
	# csv seperate ','
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    # C -> F
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

# csv 파일을 텍스트 RDD로 생성
lines = sc.textFile("1800.csv")
# (key, **value)로 변환
parsedLines = lines.map(parseLine)
# 최대 온도를 구하기 위해 TMAX Type만을 필터링
minTemps = parsedLines.filter(lambda x: "TMAX" in x[1])
# 집계 함수를 활용하기 위해 (key, value)형식으로 변환
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
# reduceByKey 집계 함수를 사용하여 key값 별로 최대 값만 남김
minTemps = stationTemps.reduceByKey(lambda x, y: max(x,y))
# 모든 RDD값을 반환
results = minTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))

0개의 댓글