import pymysql
import time
class UrlProcessor:
def __init__(self):
self.host = "host"
self.port = "port"
self.user = "user"
self.password = "pw"
self.db = "db"
def save_checkpoint(self, id):
with open("checkpoint.txt", "w") as file:
file.write(str(id))
def load_checkpoint(self):
try:
with open("checkpoint.txt", "r") as file:
return int(file.read().strip())
except FileNotFoundError:
return 0
def fetch_all_rows(self, id):
try:
connection = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password,
db=self.db, connect_timeout=60, read_timeout=120, charset="utf8")
with connection.cursor() as cursor:
sql = """
SELECT id FROM table_name
WHERE uid LIKE "%%target_url%%" AND id > %s
ORDER BY id
LIMIT 100
"""
cursor.execute(sql, (id,))
return cursor.fetchall()
except pymysql.err.OperationalError as e:
print("Error in fetch_rows:", str(e))
return None
finally:
connection.close()
def fetch_target_rows(self, id):
try:
connection = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password,
db=self.db, connect_timeout=60, read_timeout=120, charset="utf8")
with connection.cursor() as cursor:
sql = """
SELECT id FROM table_name
WHERE uid LIKE "%%target_url%%" AND status = 'DEAD' AND id > %s
ORDER BY id
LIMIT 100
"""
cursor.execute(sql, (id,))
return cursor.fetchall()
except pymysql.err.OperationalError as e:
print("Error in fetch_rows:", str(e))
return None
finally:
connection.close()
def update_rows(self, ids_to_update):
try:
connection = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password,
db=self.db, connect_timeout=60, read_timeout=120, charset="utf8")
with connection.cursor() as cursor:
update_sql = """
UPDATE table_name
SET status = "ALIVE"
WHERE id IN (%s)
""" % ",".join(map(str, ids_to_update))
cursor.execute(update_sql)
connection.commit()
except pymysql.err.OperationalError as e:
print("Error in update_rows:", str(e))
finally:
connection.close()
def change_column(self):
id = self.load_checkpoint()
while True:
try:
all_rows = self.fetch_all_rows(id)
target_rows = self.fetch_target_rows(id)
if not all_rows:
print("No more rows to process")
return
if target_rows:
ids_to_update = [row[0] for row in target_rows]
self.update_rows(ids_to_update)
id = all_rows[-1][0]
print(f"Last processed ID: {id}")
self.save_checkpoint(id)
except (pymysql.err.OperationalError, pymysql.err.IntegrityError) as e:
print("Error:", str(e))
print("Recovering from error...")
time.sleep(5)
id = self.load_checkpoint()
except Exception as e:
print("Error:", str(e))
print("Aborting the process")
return
time.sleep(1)
if __name__ == "__main__":
processor = UrlProcessor()
processor.change_column()
UrlProcessor
클래스는 init 생성자를 사용하여 MySQL 서버에 연결하는데 필요한 정보를 초기화save_checkpoint
메소드는 주어진 ID를 파일에 저장하여, 프로그램이 다시 시작될 때 해당 ID부터 처리를 재개할 수 있게 한다.load_checkpoint
메소드는 파일에서 마지막으로 저장된 ID를 불러온다.fetch_all_rows
메소드는 데이터베이스에서 주어진 ID보다 큰 모든 로우들을 가져온다. 이 메소드는 체크포인트를 기반으로 작동한다.fetch_target_rows
메소드는 상태가 'DEAD'인 로우들을 가져온다. 이 메소드 역시 체크포인트를 기반으로 작동한다.update_rows
메소드는 주어진 ID 리스트에 해당하는 로우들의 상태를 "ALIVE"로 업데이트한다.change_column
메소드는 주요 로직을 담고 있으며, 루프를 사용하여 로우들을 쿼리하고 필요한 경우 업데이트한다. 또한 이 메소드는 예외 처리를 사용하여 오류가 발생할 경우 복구한다.체크포인트(Checkpoint): 프로그램이 중단되었을 때, 마지막으로 처리한 위치를 기록하여 처리를 재개할 수 있도록 한다. 여기서는 id 값을 파일에 저장하여 체크포인트로 사용한다.
페이징(Paging): LIMIT와 ORDER BY를 사용하여 쿼리 결과를 일정한 크기로 분할하여 처리한다. 이는 메모리 사용량을 줄이고, 대량의 데이터를 효율적으로 처리하는데 도움이 된다.
배치 업데이트(Batch Update): 한 번에 여러 로우를 업데이트하여 데이터베이스에 적용하는 데 필요한 요청 수를 줄인다. 이는 네트워크 지연 시간을 줄이고 성능을 향상시킨다.
예외 처리(Exception Handling): 데이터베이스 연결 문제나 기타 오류를 처리하기 위해 예외 처리를 사용한다. 이를 통해 프로그램이 예기치 않은 상황에서 중단되는 것을 방지하고, 필요한 경우 오류 복구 절차를 실행할 수 있다.
인덱싱(Indexing): 여기서는 코드에서 직접 인덱싱을 다루지는 않지만, id 기반으로 쿼리를 수행하므로 id 컬럼에 인덱스가 있을 경우 쿼리 성능이 향상된다.
타임 아웃(Timeouts) 및 슬립(Sleep): 데이터베이스 연결과 쿼리에 타임아웃을 설정하여 무한 대기 상태를 방지하고, 루프 내에서 일정 시간 동안 대기하여 자원을 절약한다.