파일 크기가 5GB에 달하는 사천팔백만건에 달하는 데이터를 마리아디비에 적재하는데 걸리는 시간은 9분 남짓 걸렸다. 시스템의 환경 (CPU 성능, 개수, 메모리, 노드 수)에 따라 많은 차이가 날 것이다.
도커 컨테이너로 두 애플리케이션 (마리아디비, 스파크) 를 설치한다. 네트워크를 동일하게 처리해야 한다.
d network create mynetwork
docker run \
--name mariadb \
-e MYSQL_ROOT_PASSWORD=1234 \
--network mynetwork \
-p 3306:3306 -d mariadb
docker run -d \
--name spark-master \
-p 8080:8080 \
-p 7077:7077 \
-p 4040:4040 \
--network mynetwork \
-v $(pwd):/workspace \
bitnami/spark:latest
ubuntu@DESKTOP-SCOK45O:~/myspark$ d ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS
NAMES
21821ec181c9 bitnami/spark:latest "/opt/bitnami/script…" 30 minutes ago Up 30 minutes 0.0.0.0:4040->4040/tcp, [::]:4040->4040/tcp, 0.0.0.0:7077->7077/tcp, [::]:7077->7077/tcp, 0.0.0.0:8080->8080/tcp, [::]:8080->8080/tcp spark-master
8d33b712bf9f mariadb "docker-entrypoint.s…" 31 minutes ago Up 31 minutes 0.0.0.0:3306->3306/tcp, [::]:3306->3306/tcp
mariadb
ubuntu@DESKTOP-SCOK45O:~/myspark$ docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' mariadb
172.18.0.2
spark-shell --jars /workspace/mariadb-java-client-2.6.2.jar
val jdbcUrl = "jdbc:mysql://172.18.0.2:3306/mysql"
val dbTable = "user"
val dbProperties = new java.util.Properties()
dbProperties.setProperty("user", "root")
dbProperties.setProperty("password", "1234")
dbProperties.setProperty("driver", "org.mariadb.jdbc.Driver")
val df = spark.read.jdbc(jdbcUrl, dbTable, dbProperties)
scala> df.count()
res6: Long = 6
scala> df.printSchema()
root
|-- Host: string (nullable = true)
|-- User: string (nullable = true)
|-- Password: string (nullable = true)
|-- Select_priv: string (nullable = true)
scala> df.select("User","Host").show(10, true)
+--------------------+--------------------+
| User| Host|
+--------------------+--------------------+
|root ...|% ...|
|healthcheck ...|127.0.0.1 ...|
|healthcheck ...|::1 ...|
|healthcheck ...|localhost ...|
|mariadb.sys ...|localhost ...|
|root ...|localhost ...|
+--------------------+--------------------+
df.write.mode("overwrite").jdbc(jdbcUrl, "my_table", dbProperties)
4천 8백만건의 데이터을 가진 csv 파일 로드
scala> val df = spark.read.option("header", "true").csv("/workspace/job_summary.csv")
df: org.apache.spark.sql.DataFrame = [job_link: string, job_summary: string]
scala> df.count()
res3: Long = 48219735
처리 시간 9분
df.write.mode("overwrite").jdbc(jdbcUrl, "myjob", dbProperties)
MariaDB [mysql]> select * from myjob limit 10;
+------------------------------------------------------+-------------+
| job_link | job_summary |
+------------------------------------------------------+-------------+
| Education Requirements | NULL |
| Board Certified/Board Eligible in Emergency Medicine | NULL |
| ACLS certified | NULL |
| Experience | NULL |
| Experience with LMR | NULL |
| 3 years of emergency room experience preferred | NULL |
| Experience with EPIC preferred. | NULL |
| Primary Location | NULL |
| MA-Oak Bluffs-MVH Martha's Vineyard Hospital | NULL |
| Work Locations | NULL |
+------------------------------------------------------+-------------+
10 rows in set (0.001 sec)
건수(사천팔백만건) 조회하는데 1분 6초 걸림
MariaDB [mysql]> select count(*) from myjob;
+----------+
| count(*) |
+----------+
| 48219735 |
+----------+
1 row in set (1 min 6.746 sec)
du -sh /var/lib/mysql/mysql
root@8d33b712bf9f:/var/lib/mysql/mysql# du -sh /var/lib/mysql/mysql
5.8G /var/lib/mysql/mysql
MariaDB [mysql]> SELECT table_name, round((data_length + index_length) / 1024 / 1024, 2) AS size_mb
-> FROM information_schema.tables
-> WHERE table_schema = 'mysql' AND table_name = 'myjob';
+------------+---------+
| table_name | size_mb |
+------------+---------+
| myjob | 5410.00 |
+------------+---------+
1 row in set (0.002 sec)
root@8d33b712bf9f:/var/lib/mysql/mysql# ls -lh /var/lib/mysql/mysql/myjob.ibd
-rw-rw---- 1 mysql mysql 5.8G Mar 25 08:15 /var/lib/mysql/mysql/myjob.ibd