대용량 CSV 데이터를 스파크로 읽어보자

Jeonghak Cho·2025년 3월 25일

Spark

목록 보기
6/12

📗대용량 CSV 데이터를 스파크로 읽어보자

Kaggle, Google Dataset Search, Data.gov 등에서 대용량 CSV 파일을 다운로드할 수 있다.
Kaggle: https://www.kaggle.com/datasets
Google Dataset Search: https://datasetsearch.research.google.com/

Kaggle에 가입하여 2GB에 달하는 고양이 품질 파일을 다운로드 하였다.
https://www.kaggle.com/datasets/denispotapov/cat-breeds-dataset-cleared

엑셀로 처리 못할 대용량 CSV 파일을 Spark로 어떻게 다루는 지 살펴볼 것이다.

  • 대용량 CSV 데이터 처리해보기
    • 엑셀로 처리되지 못할 규모의 파일 읽기
    • 데이터 건수 확인
    • 집계함수
    • VIEW 생성
    • 오브젝트 조회

🔗[목차]

CSV 다루기

Kaggle에 가입하여 2GB에 달하는 아래 내용의 파일을 다운로드 하였다.
1.3M Linkedin Jobs & Skills (2024)

압축을 해제 하니 아래 파일들이 있었다.

4 -rw-r--r--  1 ubuntu ubuntu         76 Mar 25 14:04  job_skills.csv:Zone.Identifier
4982444 -rw-r--r--  1 ubuntu ubuntu 5102016275 Mar 25 14:05  job_summary.csv
4 -rw-r--r--  1 ubuntu ubuntu         76 Mar 25 14:05  job_summary.csv:Zone.Identifier

job_summary.csv 파일 크기가 5GB에 육박한다. 엑셀로 열어보려 했더니 힘들어 하고, 응답도 하지 않는다. 작업 관리자로 종료하고 Spark-shell을 실행했다.

CSV 를 데이터프레임에 적재

5GB에 달하는 CSV를 읽는데 수초 밖에 소요되지 않았다.

scala> val df = spark.read.option("header", "true").csv("/workspace/job_summary.csv")
df: org.apache.spark.sql.DataFrame = [job_link: string, job_summary: string]

데이터 건 수 확인 하기

scala> df.count()
res3: Long = 48219735

데이터 조회하기

scala> df.select("*").show(10)
+--------------------+--------------------+
|            job_link|         job_summary|
+--------------------+--------------------+
|https://www.linke...|Rock N Roll Sushi...|
|As our Restaurant...| you?ll never be ...|
|            We Offer|                NULL|
|Competitive compe...|                NULL|
|  Insurance benefits|                NULL|
| Bonus opportunities|                NULL|
|A great work atmo...|                NULL|
|Duties/Responsibi...|                NULL|
|Ensuring that our...|                NULL|
|Maintaining opera...|                NULL|
+--------------------+--------------------+
only showing top 10 rows

아래 처럼 job_link 별로 건 수를 보려 했더니, 개인 머신의 사양으로 인해 OOM 오류를 만든다.

집계연산

df.select("*").groupBy("job_link").count().show(10,false)

java.lang.OutOfMemoryError: Java heap space

VIEW 생성 후 조회

scala> df.createOrReplaceTempView("myjob")
scala> var result = spark.sql("SELECT * FROM myjob")
result: org.apache.spark.sql.DataFrame = [job_link: string, job_summary: string]

scala> result.show(10)
+--------------------+--------------------+
|            job_link|         job_summary|
+--------------------+--------------------+
|https://www.linke...|Rock N Roll Sushi...|
|As our Restaurant...| you?ll never be ...|
|            We Offer|                NULL|
|Competitive compe...|                NULL|
|  Insurance benefits|                NULL|
| Bonus opportunities|                NULL|
|A great work atmo...|                NULL|
|Duties/Responsibi...|                NULL|
|Ensuring that our...|                NULL|
|Maintaining opera...|                NULL|
+--------------------+--------------------+
only showing top 10 rows

VIEW 목록 확인

scala> spark.sql("SHOW TABLES").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |    myjob|       true|
+---------+---------+-----------+

SPARK-SQL

I have no name!@383fd62c157d:/opt/bitnami/spark$ spark-sql
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/25 05:57:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/25 05:57:51 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
25/03/25 05:57:51 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
25/03/25 05:57:57 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
25/03/25 05:57:57 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.17.0.2
25/03/25 05:57:57 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
Spark Web UI available at http://383fd62c157d:4040
Spark master: local[*], Application Id: local-1742882270251
spark-sql (default)>

테이블 생성하기

spark-sql (default)> create table myjob(job_link STRING, job_summary STRING);
...
spark-warehouse/myjob specified for non-external table:myjob
Time taken: 0.733 seconds

테이블 조회하기

spark-sql (default)> show tables;
myjob
Time taken: 0.374 seconds, Fetched 1 row(s)

0개의 댓글