Mysql 대용량 데이터 Update

Q·2023년 6월 23일
0

MySQL

목록 보기
5/5

✅ 개요

  • 회사에서 업무중 문의가 왔다.
  • Target DB에서 특정 URL이 들어간 Row 값들 중 한 Column의 값이 모두 죽은 문서로 표시되는 값으로 표시되어 있는 것
  • 하지만 해당 URL은 살아 있으므로 이 특정 URL의 Column을 살아있는 값으로 변경하는 작업을 진행했다.
  • 접근해서 update를 하려는 Table의 크기는 13419802개의 row들이 있으며 id라는 컬럼이 primary key이며 인덱스auto_increment가 지정되어있다.

✅ 해결 코드

import pymysql
import time

class UrlProcessor:
    def __init__(self):
        self.host = "host"
        self.port = "port"
        self.user = "user"
        self.password = "pw"
        self.db = "db"

    def save_checkpoint(self, id):
        with open("checkpoint.txt", "w") as file:
            file.write(str(id))

    def load_checkpoint(self):
        try:
            with open("checkpoint.txt", "r") as file:
                return int(file.read().strip())
        except FileNotFoundError:
            return 0

    def fetch_all_rows(self, id):
        try:
            connection = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password,
                                         db=self.db, connect_timeout=60, read_timeout=120, charset="utf8")

            with connection.cursor() as cursor:
                sql = """
                    SELECT id FROM table_name
                    WHERE uid LIKE "%%target_url%%" AND id > %s
                    ORDER BY id
                    LIMIT 100
                """
                cursor.execute(sql, (id,))
                return cursor.fetchall()

        except pymysql.err.OperationalError as e:
            print("Error in fetch_rows:", str(e))
            return None
        finally:
            connection.close()

    def fetch_target_rows(self, id):
        try:
            connection = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password,
                                         db=self.db, connect_timeout=60, read_timeout=120, charset="utf8")

            with connection.cursor() as cursor:
                sql = """
                    SELECT id FROM table_name
                    WHERE uid LIKE "%%target_url%%" AND status = 'DEAD' AND id > %s
                    ORDER BY id
                    LIMIT 100
                """
                cursor.execute(sql, (id,))
                return cursor.fetchall()

        except pymysql.err.OperationalError as e:
            print("Error in fetch_rows:", str(e))
            return None
        finally:
            connection.close()

    def update_rows(self, ids_to_update):
        try:
            connection = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password,
                                         db=self.db, connect_timeout=60, read_timeout=120, charset="utf8")

            with connection.cursor() as cursor:
                update_sql = """
                    UPDATE table_name
                    SET status = "ALIVE"
                    WHERE id IN (%s)
                """ % ",".join(map(str, ids_to_update))

                cursor.execute(update_sql)
                connection.commit()

        except pymysql.err.OperationalError as e:
            print("Error in update_rows:", str(e))
        finally:
            connection.close()

    def change_column(self):
        id = self.load_checkpoint()

        while True:
            try:
                all_rows = self.fetch_all_rows(id)
                target_rows = self.fetch_target_rows(id)

                if not all_rows:
                    print("No more rows to process")
                    return

                if target_rows:
                    ids_to_update = [row[0] for row in target_rows]
                    self.update_rows(ids_to_update)

                id = all_rows[-1][0]
                print(f"Last processed ID: {id}")
                self.save_checkpoint(id)

            except (pymysql.err.OperationalError, pymysql.err.IntegrityError) as e:
                print("Error:", str(e))
                print("Recovering from error...")
                time.sleep(5)
                id = self.load_checkpoint()

            except Exception as e:
                print("Error:", str(e))
                print("Aborting the process")
                return

            time.sleep(1)

if __name__ == "__main__":
    processor = UrlProcessor()
    processor.change_column()

⚡ 클래스 구조

  1. UrlProcessor 클래스는 init 생성자를 사용하여 MySQL 서버에 연결하는데 필요한 정보를 초기화
  2. save_checkpoint 메소드는 주어진 ID를 파일에 저장하여, 프로그램이 다시 시작될 때 해당 ID부터 처리를 재개할 수 있게 한다.
  3. load_checkpoint 메소드는 파일에서 마지막으로 저장된 ID를 불러온다.
  4. fetch_all_rows 메소드는 데이터베이스에서 주어진 ID보다 큰 모든 로우들을 가져온다. 이 메소드는 체크포인트를 기반으로 작동한다.
  5. fetch_target_rows 메소드는 상태가 'DEAD'인 로우들을 가져온다. 이 메소드 역시 체크포인트를 기반으로 작동한다.
  6. update_rows 메소드는 주어진 ID 리스트에 해당하는 로우들의 상태를 "ALIVE"로 업데이트한다.
  7. change_column 메소드는 주요 로직을 담고 있으며, 루프를 사용하여 로우들을 쿼리하고 필요한 경우 업데이트한다. 또한 이 메소드는 예외 처리를 사용하여 오류가 발생할 경우 복구한다.

⚡ 주요 기술 및 전략

  1. 체크포인트(Checkpoint): 프로그램이 중단되었을 때, 마지막으로 처리한 위치를 기록하여 처리를 재개할 수 있도록 한다. 여기서는 id 값을 파일에 저장하여 체크포인트로 사용한다.

  2. 페이징(Paging): LIMIT와 ORDER BY를 사용하여 쿼리 결과를 일정한 크기로 분할하여 처리한다. 이는 메모리 사용량을 줄이고, 대량의 데이터를 효율적으로 처리하는데 도움이 된다.

  3. 배치 업데이트(Batch Update): 한 번에 여러 로우를 업데이트하여 데이터베이스에 적용하는 데 필요한 요청 수를 줄인다. 이는 네트워크 지연 시간을 줄이고 성능을 향상시킨다.

  4. 예외 처리(Exception Handling): 데이터베이스 연결 문제나 기타 오류를 처리하기 위해 예외 처리를 사용한다. 이를 통해 프로그램이 예기치 않은 상황에서 중단되는 것을 방지하고, 필요한 경우 오류 복구 절차를 실행할 수 있다.

  5. 인덱싱(Indexing): 여기서는 코드에서 직접 인덱싱을 다루지는 않지만, id 기반으로 쿼리를 수행하므로 id 컬럼에 인덱스가 있을 경우 쿼리 성능이 향상된다.

  6. 타임 아웃(Timeouts) 및 슬립(Sleep): 데이터베이스 연결과 쿼리에 타임아웃을 설정하여 무한 대기 상태를 방지하고, 루프 내에서 일정 시간 동안 대기하여 자원을 절약한다.

profile
Data Engineer

0개의 댓글