HBase 데이터 삭제2

Q·2023년 4월 17일
0

✅ 개요

  • 이전에 개발한 HBase 데이터 삭제는 테이블의 모든 row데이터를 scan하고 Timestamp를 기준으로 이전의 데이터들을 삭제하는 단순한 코드였다.
  • 하지만 역시 대규모의 데이터를 하나의 프로세스로 scan하고 삭제하기까지는 시간이 너무 오래 걸렸다.
  • 그래서 이번에는 더 나아가 HBase의 특징인 region을 이용하여 데이터를 삭제하기로 했다.

⚡ HBase의 region

  • HBase의 region은 여러개의 row key 범위를 나타내는 단위이며 HBase에서는 데이터를 저장할 때 row key를 기준으로 파티셔닝을 수행하며, 각 파티션이 region으로 나뉘어진다.
  • region은 서로 겹치지 않는 연속적인 row key 범위를 가지고 각 region은 메모리와 디스크에 캐시되며, 사용자는 region 수준에서 데이터를 스캔하거나 읽고 쓸 수 있다.
  • 이번에 작성할 파이썬 코드의 핵심은 multi threading을 사용하여 하나의 region당 하나의 thread를 할당하여 병렬로 프로세스를 실행시킬 것이다.

✅ 사용한 Server 및 언어

ServerLanguage
Centos7Python3.7

✅ Python 설치

✅ Python Code

  • HBase 테이블에서 특정 시간 이전의 데이터를 삭제하는 스크립트 작성2
  • multi threading을 사용하여 하나의 region당 하나의 thread를 할당하여 병렬로 프로세스를 실행
import concurrent.futures
import time
import happybase
from datetime import datetime
import random
import socket
import sys

socket.setdefaulttimeout(10)

# HBase 정보
HOST = ["host1", "host2"] # Host
table_name = "테이블 이름" # 테이블 이름

dt = datetime(2020, 1, 1) # 예) 2022년 1월 1일 이전의 데이터를 삭제
target_timestamp = int(dt.timestamp() * 1000)
scan_batch_size = 1000  # 한 번에 스캔할 데이터 수를 설정

# HBase Connect
def getHConnection(num=None):
    while 1:
        try:
            if num == None:
                num = random.randint(0, len(HOST) - 1)
            server = HOST[num]
            connection = happybase.Connection(server, port=9090, timeout=60000)
            connection.open()
            return connection
        except Exception as msg:
            print("error:", msg)
            time.sleep(1)
            num = None

# start_row와, end_row를 받아 범위 내의 timestamp 이하의 데이터 삭제
def delete_data(worker_id, start_row, end_row):
    print("현재 region: ",worker_id)
    st_row = start_row
    while True:
        connection = getHConnection()
        table = connection.table(table_name)  

        row_list = []
        last_row_key = None

        for row_key, data in table.scan(row_start=st_row, row_stop=end_row, include_timestamp=True):
            for column, (value, timestamp) in data.items():
                if timestamp < target_timestamp:
                    row_list.append(row_key)
                    break
                else:
                    break

            last_row_key = row_key

            if len(row_list) >= scan_batch_size:
                break

        for row_key in row_list:
            table.delete(row_key)
            print(f"Deleted row_key: {row_key}")

        if last_row_key:
            st_row = last_row_key  # 체크포인트를 마지막 row_key로 설정
        else:
            break  # 스캔이 완료되면 루프를 종료

        connection.close()
        time.sleep(1)  # 연결이 끊어진 후 다음 스캔을 시작하기 전에 일시 중지

if __name__ == '__main__':
    start_row = None  # 초기 시작 행을 None으로 설정
    end_row = None # end_row 초기화

    # 리전 정보 가져오기
    connection = getHConnection()
    table = connection.table(table_name)
    regions = connection.client.getTableRegions(table.name) # 해당 테이블의 모든 리전 정보를 가져온다.
    connection.close()

    # 리전 수 별로 멀티 스레딩
    num_processes = len(regions) # 이 부분이 중요!!!!! 
    print("regions number:", num_processes) # 리전 갯수 출력!

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_processes) as executor:
        # 각 리전의 start row 와 end row 구하기
        for i, region in enumerate(regions):
            start_row = region.startKey
            end_row = region.endKey

            try:
                future = executor.submit(delete_data, i, start_row, end_row)
            except Exception as msg:
                print("error:", msg)
                sys.exit()
  • getHConnection: 주어진 호스트 중 하나에 연결을 시도하고, 성공한 경우 HappyBase Connection 객체를 반환, 연결에 실패하면 다른 호스트로 시도한다.
  • dt = datetime(2022, 1, 1): 삭제할 날짜를 지정한다. 이 예에서는 2022년 1월 1일 이전의 데이터를 삭제한다.
  • scan_batch_size, start_row: 한 번에 스캔할 데이터 수를 지정하고, 초기 시작 행을 None으로 설정
  • for row_key, data in table.scan(row_start=start_row, include_timestamp=True)
    • for문을 사용하여 테이블을 스캔하고, 특정 시간 이전의 데이터를 찾는다. 찾은 데이터의 행 키를 row_list에 추가한다.
    • 스캔이 끝날 때까지 행을 스캔하고, scan_batch_size에 도달하면 루프를 멈춘다.(1000개씩 데이터를 처리하기 위함)
  • table.delete(row_key): row_list에 저장된 행 키에 대해 삭제 작업을 수행하고, 삭제된 행 키를 출력한다.
  • 스캔이 끝나지 않았다면, 마지막으로 스캔한 행 키를 시작 행으로 설정하고, 다음 스캔을 시작하기 전에 1초간 대기한다.
  • if __name__ == '__main__'
    • regions = connection.client.getTableRegions(table.name): HBase 서버에 연결하고 테이블의 리전 정보를 가져온다.
    • 각 리전의 시작 행과 종료 행을 구한다.
    • ThreadPoolExecutor를 사용하여 각 리전에 대해 delete_data 함수를 병렬로 실행한다. 각 리전은 별도의 thread에서 처리된다.
    • num_processes = len(regions): 멀티 스레딩을 실행시킬 갯수로 만약 이대로 실행을 시키면 하나의 thread에 하나의 region이 할당되어 실행된다. 하지만 이 말은 만약 데이터양이 많아 ex) 8000개의 region 갯수하면 스레드도 8000개가 실행됨 스레드가 너무 많이 실행된다면 서버에 부담이되어 timeout을 내뱉을수도 있으니 적당히 한번에 실행시킬 스레드의 갯수를 조절해야한다. (필자는 100개의 스레드를 실행하도록 맞췄다.)
    • 예외가 발생하면 오류 메시지를 출력하고 프로그램을 종료한다.

✅ 더 알아둬야할 지식들

⚡ ProcessPoolExecutor와 ThreadPoolExecutor

1. ProcessPoolExecutor

  • 멀티 프로세싱을 지원한다. 여러 개의 독립적인 프로세스를 생성하여 작업을 병렬로 실행한다.
  • 각 프로세스는 독립적인 메모리 공간을 가진다. 따라서 전역 인터프리터 락(Global Interpreter Lock, GIL)에 영향을 받지 않으며, CPU 바운드 작업에서 성능 향상을 얻을 수 있다.
  • 프로세스간 통신은 피클링(pickling)을 통해 진행, 작업에 전달되는 인자와 결과는 피클링 가능해야 한다.
  • 작업을 생성하고 관리하는 오버헤드가 상대적으로 크기 때문에, I/O 바운드 작업에는 적합하지 않을 수 있다.

2. ThreadPoolExecutor

  • 멀티 스레딩을 지원한다. 하나의 프로세스 내에서 여러 개의 스레드를 생성하여 작업을 병렬로 실행한다.
  • 모든 스레드는 같은 메모리 공간을 공유한다. 전역 인터프리터 락(GIL) 때문에, 한 번에 하나의 스레드만 실행될 수 있어, CPU 바운드 작업에서 성능 향상을 기대하기 어렵다.
  • 작업을 생성하고 관리하는 오버헤드가 상대적으로 작기 때문에, I/O 바운드 작업에 더 적합하다.
  • 스레드간 통신이 별도의 직렬화 과정이 필요하지 않는다. 작업에 전달되는 인자와 결과가 피클링 가능할 필요가 없습니다.

⚡ I/O 바운드(I/O-bound)와 CPU 바운드(CPU-bound)

1. I/O 바운드 (I/O-bound)

  • I/O 바운드는 프로그램의 실행 성능이 입력/출력(I/O) 작업, 예를 들어 디스크, 네트워크, 사용자 입력 등에 의해 제한되는 경우를 의미한다.
  • I/O 바운드 작업에서는 대부분의 시간이 데이터를 주고받는 데 사용되며, CPU의 처리 능력이 프로그램 성능에 큰 영향을 미치지 않는다.
  • 이러한 작업에서는 CPU가 대기 상태로 많은 시간을 보내게 되어, 다른 작업을 처리할 수 있는 여유가 생긴다.
  • I/O 바운드 작업을 최적화하는 일반적인 방법 중 하나는 비동기 처리나 병렬 처리를 사용하여 I/O 작업을 최소화하는 것이다.
  • I/O 바운드의 주된 제약 요소는 네트워크와 디스크I/O 성능

2. CPU 바운드 (CPU-bound)

  • CPU 바운드는 프로그램의 실행 성능이 CPU의 처리 능력에 의해 제한되는 경우를 의미한다.
  • CPU 바운드 작업에서는 대부분의 시간이 CPU가 계산을 수행하는 데 사용되며, 입력/출력 작업이 병목 현상을 일으키지 않는다.
  • CPU 바운드 작업의 성능은 CPU의 처리 속도와 코어 수에 크게 영향을 받는다.
  • CPU 바운드 작업을 최적화하는 일반적인 방법 중 하나는 알고리즘을 개선하거나, 병렬 처리를 사용하여 여러 CPU 코어에서 동시에 실행되도록 하는 것이다.
  • CPU 바운드 작업의 주된 제약 요소는 CPU 자체의 처리 능력이며, 메모리 성능이 큰 영향을 미치지는 않는다.

⚡ Python에서 GIL과 멀티 스레드

  • GIL 때문에 한 번에 하나의 스레드만 실행되지만, 운영체제가 스레드를 빠르게 전환하면서 여러 스레드가 병렬로 실행되는 것처럼 보인다.
  • 따라서 실제로 병렬 처리는 아니지만, 스레드 전환이 빠르기 때문에 동시에 실행되는 것처럼 느껴지는 것이다.
  • I/O 바운드 작업의 경우, 스레드는 I/O 작업을 기다리는 동안 다른 스레드가 실행된다.
  • 이렇게 함으로써 I/O 작업을 기다리는 시간 동안 다른 작업을 수행할 수 있어 효율이 향상된다.
  • 하지만 CPU 바운드 작업의 경우, 작업이 계산에 많은 시간을 소비하기 때문에 여러 스레드가 동시에 실행되면 이론적으로 성능 향상이 가능하지만 GIL 때문에 한 번에 하나의 스레드만 실행되어 스레드 간의 병렬 처리가 어렵게 된다.

간단히 말해, GIL이 있더라도 I/O 바운드 작업에서는 스레드를 효과적으로 사용할 수 있지만, CPU 바운드 작업에서는 성능 제약이 있다. 이 때문에 CPU 바운드 작업에서는 멀티 프로세스를 사용하는 것이 좋다.

✅ python 백그라운드 실행

$ nohup python 스크립트.py &
profile
Data Engineer

0개의 댓글