[Spark] 날짜 다루기 (date, timestamp)

Woong·2022년 5월 23일
0

Apache Spark

목록 보기
14/22
post-custom-banner

to_date, to_timestamp

  • Spark 에서 제공하는 함수로, 문자열로 된 날짜를 date 혹은 timestamp 로 변환한다.
    • to_date 는 날짜 포맷이 선택이지만, to_timestamp 는 날짜 포맷이 필수
// to_date 의 경우 
df.withColumn("date", to_date($"time", "yyyy/MM/dd"))
// to_timestamp
df.withColumn("timestamp", to_timestamp($"time", "yyyy/MM/dd HH:mm:ss"))
  • 데이터 중 날짜값에 'T' 문자열이 들어간 경우 ' 을 통해 escape 처리해주어야한다.
df.withColumn("time", to_timestamp($"time", "yyyy/MM/dd'T'HH:mm:ss"))

날짜 범위 지정

  • 날짜, 타임스탬프로 비교하거나, yyyy-MM-dd 포맷 문자열로 비교
df.filter(col("time") > lit("2022-05-23")).show()
  • col 내장함수 between 을 통해 기간 지정
df.where(col("time").between("2022-05-23 15:55:12", "2022-05-25 15:50:11")).show()

read 범위 축소

날짜로 파티셔닝된 데이터 중 read 할 범위 지정

  • scala 예제
## scala

import org.apache.hadoop.fs.Path
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter

val startDate = "2022-01-01"
val endDate = DateTimeFormatter.ofPattern("yyyy-MM-dd").format(ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1))

val baseFolder = new Path("hdfs://localhost:9000/user/root/DataBase/parquet/")

val files = baseFolder.getFileSystem(sc.hadoopConfiguration).listStatus(baseFolder).map(_.getPath.toString)
val filteredFiles = files.filter(path => path.split("/").last >= startDate &&  path.split("/").last <= endDate)

val df = spark.read.csv(filteredFiles: _*) 
  • python 예제
## Python 
import datetime as dt
d1 = dt.date(2022,01,01)
d2 = dt.date(2022,06,30)

date_list = [d1 + dt.timedelta(days=x) for x in range(0, (d2 - d1).days + 1)]
input_path = ",".join(["%04d%02d%02d" % (d.year, d.month, d.day) for d in  date_list])

응용

  • 시간 범위를 문자열로 받아 필터링하는 scala 예제
val startInput = "20220501000000"
val endInput = "20220531235959"

val inputFormat = new SimpleDateFormat("yyyyMMddHHmmss")
val fileOutputFormat = new SimpleDateFormat("yyyyMMdd")
val queryOutputFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

val fileStartDate = fileOutputFormat.format(inputFormat.parse(startInput))
val endDate = fileOutputFormat.format(inputFormat.parse(endInput))

val baseFolder = new Path("hdfs://localhost:9000/DataBase/parquet/")
val files = baseFolder.getFileSystem(sc.hadoopConfiguration).listStatus(baseFolder).map(_.getPath.toString)
val filteredFiles = files.filter(path => path.split("/").last >= startDate &&  path.split("/").last <= endDate)
	
val originalDF = spark.read.csv(filteredFiles: _*)
val timestampDF = originalDF.withColumn("time", to_timestamp($"time", "yyyy/MM/dd'T'HH:mm:ss"))

val filteredDF = timestampDF.where(col("time").between(queryOutputFormat.format(inputFormat.parse(startInput)), queryOutputFormat.format(inputFormat.parse(endInput))))

reference

post-custom-banner

0개의 댓글