to_date, to_timestamp
- Spark 에서 제공하는 함수로, 문자열로 된 날짜를 date 혹은 timestamp 로 변환한다.
- to_date 는 날짜 포맷이 선택이지만, to_timestamp 는 날짜 포맷이 필수
// to_date 의 경우
df.withColumn("date", to_date($"time", "yyyy/MM/dd"))
// to_timestamp
df.withColumn("timestamp", to_timestamp($"time", "yyyy/MM/dd HH:mm:ss"))
- 데이터 중 날짜값에 'T' 문자열이 들어간 경우 ' 을 통해 escape 처리해주어야한다.
df.withColumn("time", to_timestamp($"time", "yyyy/MM/dd'T'HH:mm:ss"))
날짜 범위 지정
- 날짜, 타임스탬프로 비교하거나, yyyy-MM-dd 포맷 문자열로 비교
df.filter(col("time") > lit("2022-05-23")).show()
- col 내장함수 between 을 통해 기간 지정
df.where(col("time").between("2022-05-23 15:55:12", "2022-05-25 15:50:11")).show()
read 범위 축소
날짜로 파티셔닝된 데이터 중 read 할 범위 지정
## scala
import org.apache.hadoop.fs.Path
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
val startDate = "2022-01-01"
val endDate = DateTimeFormatter.ofPattern("yyyy-MM-dd").format(ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1))
val baseFolder = new Path("hdfs://localhost:9000/user/root/DataBase/parquet/")
val files = baseFolder.getFileSystem(sc.hadoopConfiguration).listStatus(baseFolder).map(_.getPath.toString)
val filteredFiles = files.filter(path => path.split("/").last >= startDate && path.split("/").last <= endDate)
val df = spark.read.csv(filteredFiles: _*)
## Python
import datetime as dt
d1 = dt.date(2022,01,01)
d2 = dt.date(2022,06,30)
date_list = [d1 + dt.timedelta(days=x) for x in range(0, (d2 - d1).days + 1)]
input_path = ",".join(["%04d%02d%02d" % (d.year, d.month, d.day) for d in date_list])
응용
- 시간 범위를 문자열로 받아 필터링하는 scala 예제
val startInput = "20220501000000"
val endInput = "20220531235959"
val inputFormat = new SimpleDateFormat("yyyyMMddHHmmss")
val fileOutputFormat = new SimpleDateFormat("yyyyMMdd")
val queryOutputFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val fileStartDate = fileOutputFormat.format(inputFormat.parse(startInput))
val endDate = fileOutputFormat.format(inputFormat.parse(endInput))
val baseFolder = new Path("hdfs://localhost:9000/DataBase/parquet/")
val files = baseFolder.getFileSystem(sc.hadoopConfiguration).listStatus(baseFolder).map(_.getPath.toString)
val filteredFiles = files.filter(path => path.split("/").last >= startDate && path.split("/").last <= endDate)
val originalDF = spark.read.csv(filteredFiles: _*)
val timestampDF = originalDF.withColumn("time", to_timestamp($"time", "yyyy/MM/dd'T'HH:mm:ss"))
val filteredDF = timestampDF.where(col("time").between(queryOutputFormat.format(inputFormat.parse(startInput)), queryOutputFormat.format(inputFormat.parse(endInput))))
reference